1 0 0 Cramming for the Journeyman Plumber Exam + Cramming for the Journeyman Plumber Exam 0 Part II: CMS Pipelines Initiation and Termination + Part II: CMS Pipelines Initiation and Termination + _____________ 0 Melinda Varian (PU) 0 Office of Computing and Information Technology Princeton University 87 Prospect Avenue, Princeton, NJ 08544 USA --.-- Email: Melinda@princeton.edu Web: http://pucc.princeton.edu/~Melinda Telephone: (609) 258-6016 0 SHARE 94 Session 9109 March 2000 0 0 I. INTRODUCTION + I. INTRODUCTION + ________________ 0 The aspiring "master plumber" can go for years without needing to understand the arcana of pipeline initiation and termination. Ultimately, however, when one begins to write very complex CMS Pipelines + _____________ applications, life becomes easier if one understands in some detail how pipelines start and stop. Every journeyman plumber should understand pipeline initiation and termination well enough to be able to write pipelines that terminate easily and cleanly even when they are waiting on multiple events outside the virtual machine, no matter what resources they may have allocated nor how early in the pipeline initialization process they may terminate. Fortunately, once one understands the pipeline commit process and end-of-file propagation, writing pipelines that terminate cleanly and easily becomes an essentially trivial exercise. 0 In this discussion of pipeline initiation and termination, I will be borrowing heavily from John Hartmann's paper CMS Pipelines Explained, as + _______________________ that is still the canonical work for understanding CMS Pipelines. + _____________ 1 0 Page 2 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 II. PIPELINE INITIATION + II. PIPELINE INITIATION + ________________________ 0 Starting up a pipeline consists of three main steps: 0 1. Parsing the pipeline specification to determine the stages to run; 0 2. Performing the syntax checking for the arguments of each stage; and 0 3. Bumping the pipeline through the commit process until every stage has reached commit level 0, at which time data begin flowing through the pipeline. 0 We will discuss each of these steps in detail. 0 Parsing the Pipeline Specification + Parsing the Pipeline Specification + __________________________________ 0 A pipeline specification can be the argument string on a PIPE command or a "CALLPIPE" or "ADDPIPE" subcommand. It can also be a record read by a "runpipe" stage. The pipeline specification is parsed by the same routine in all of these cases. 0 The pipeline specification parser (or informally: the scanner) + ______________________________ _______ scans the pipeline specification... to identify programs to run + _______________________ and the connections between these programs.(1) 0 When the scanner is finished, it has built the control block structure describing the programs to run and the connections between them. 0 To process [a] pipeline, the scanner first counts the number of stage separators [and pipeline end characters] to determine the number of stages. Each stage is described by the string between two stage separators (or to an end of the pipeline if the stage is first or last). The first blank-delimited word in each stage is the name of the program to run; the remainder of the string is the argument string to the particular invocation of the program. + _______________ 0 To resolve the name, the scanner looks first in its directories of built-in programs; if it does not find the program there, it looks for a file with file type REXX; such a file is assumed to be a pipeline program written in the Procedures Language/REXX. 0 The directories of built-in programs (the "entry-point tables") are built when the main pipeline module initializes. This is the order of the programs in the entry-point tables: 0 1. Programs contained in the PIPPTFF filter package; 0 -------------------- 0 (1) All quotes not otherwise marked are from John Hartmann's CMS + ___ Pipelines Explained. + ___________________ 1 0 CMS Pipelines Initiation and Termination Page 3 ------------------------------------------------------------------------ 0 2. Programs contained in the main pipeline module (usually called PIPELINE MODULE); 0 3. Programs contained in the PIPSYSF, PIPLOCF, and PIPUSERF filter packages (in that order); and 0 4. Programs contained in any other filter packages that have installed themselves in the Pipelines namespace (in the order in which they + _________ were installed). 0 Note particularly that any stages contained in PIPPTFF will override stages of the same name in the main pipeline module and that stages in the main module or any installed filter packages will override REXX stages of the same name. Because of this latter behavior, master plumbers generally recommend that any REXX filter used in a production application be specified as the explicit argument of a "rexx" stage, to prevent the application from failing years later when a new stage of the same name shows up in the main pipeline module or in a user filter package. A number of times over the years, however, canny plumbers have taken advantage of the precedence of built-ins over REXX filters to insure that their REXX filters would be used only until the built-in of the same name became available. By writing their REXX filter to the same interface as a new filter built into a future release, they have assured themselves of an automatic performance improvement once their users gained access to that future release. 0 Pipeline Syntax Checking + Pipeline Syntax Checking + ________________________ 0 When the scanner has resolved all the programs to run, it checks the syntax of the argument strings to built-in programs. Some checks are decided by inspecting the program descriptor for the + __________________ built-in program; other checks are performed by calling the syntax + ______ routine that the program descriptor specifies. For example, the + _______ program descriptor for the "<" program specifies that it must be the first stage in a pipeline; its syntax routine may verify that there are two or three words in the argument string and that the specified file exists. 0 The program descriptors are defined by FPEP or PIPDESC macros (FPEP is now deprecated). Program descriptors can be complex, as this descriptor for the "<" built-in program will illustrate: 1 0 Page 4 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 +----------------------------------------------------------------------+ | | | &MODULE.RO FPEP FLAGS=FPMUSTARG+FPNOFPR+FPFIRST, * | | STREAMS=1, * | | ABOVE=NO, Must be below * | | SYNTROUT=DISKBEG, * | | HLL=ASM, Assembler * | | SYNTAX=((:0,0),(=0,6), Indicate must exist * | | -FNTINIT,WORD,?Z,(INCR1,8,CVT,UTKN),-PROCFM, * | | (:0,1), Flag for ADT wanted * | | CALLSYNT,DONE) Process | | | +----------------------------------------------------------------------+ 0 Checking the syntax of the arguments to all of the stages before any stage begins executing has obvious advantages, so you should use the syntax-checking facilities of CMS Pipelines when you write an Assembler + _____________ stage. Using the PIPDESC macro, you can specify the maximum number of streams for the stage, whether it must be or can be or cannot be a first stage, and whether it may have, must have, or must not have arguments. To do more syntax checking than that, you need to write a syntax exit. Writing a syntax exit for a pipeline stage is not difficult, especially if you take advantage of the string-parsing macros available to the Pipelines programmer. Writing syntax exits for Assembler stages is + _________ discussed briefly in John Hartmann's paper Writing Assembler Filters for + _____________________________ Use with CMS Pipelines and more comprehensively in his paper Virtual + _______________________ _______ Machine CMS Pipelines Procedures Macro Language.(2) + _______________________________________________ 0 Alternatively, you can write a syntax program inline in the PIPDESC or FPEP macro, as has been done in the SYNTAX= parameter in the program descriptor for the "<" stage, but that is somewhat more challenging. The inline syntax programs are written for an abstract machine described by its author as "a stored-program, single instruction stream, single data stream, non-vonNeumann machine". You will find this abstract machine described in the two papers just mentioned. 0 When writing a syntax exit, keep in mind that a syntax exit must not allocate any resources. If the syntax check for any stage exits with a non-zero return code, the entire pipeline is abandoned. Because none of the stages should have allocated any resources, none of them is expected to have any cleanup to do, so none of them is entered again before the pipeline is abandoned. 0 0 0 -------------------- 0 (2) All papers mentioned in this paper are available from the CMS + ___ Pipelines Web page, http://pucc.princeton.edu/~pipeline/. + _________ 1 0 CMS Pipelines Initiation and Termination Page 5 ------------------------------------------------------------------------ 0 The Pipeline Commit Process + The Pipeline Commit Process + ___________________________ 0 If all of the syntax checks succeed, the scanner hands the pipeline specification over to the pipeline dispatcher, which begins stepping the pipeline through the commit process. The commit process is required because there are several things that cannot be done during the scanner's syntax checks: 0 o REXX programs have no program descriptor; the scanner cannot check the syntax of the arguments to REXX pipeline programs. 0 o Programs that wish to ensure that a particular stream is connected (or not) cannot determine this at the time of the syntax check, because the connections are not connected until the pipeline specification is passed from the scanner to the dispatcher. 0 o Programs that use a resource that needs to be de-allocated explicitly cannot allocate such a resource in their syntax routine because they will not be informed if the pipeline specification is abandoned. 0 The pipeline notion of a commit level was designed to address + _____________ these problems. [As we shall see later,] for a REXX program, the effect is that the part of the program up to the first read or write operation acts logically as a syntax checking routine; if the REXX program returns with a non-zero return code without having performed a read or a write, then the complete pipeline is abandoned [as it is if one of the built-in programs detects an error during its syntax checking].(3) 0 The commit level provides a general mechanism to allow unrelated programs to co-ordinate their progress. One use of the commit level mechanism is to allow all stages to validate their argument strings [and allocate the necessary resources] before any stage takes an action that might potentially destroy data, such as erasing a file or writing on a tape. Thus, the pipeline is abandoned if a built-in program detects an error in its arguments or if a REXX program returns with a non-zero return code before reading or writing [or if any stage cannot acquire the resources it needs to operate properly]. 0 The commit level is a number between -2147483647 and +2147483647 inclusive. Each stage is at a particular commit level at any time. It increases its commit level with the pipeline command "COMMIT". It cannot decrease its commit level. The pipeline specification parser performs an implicit commit when a stage is defined. The program descriptor for a built-in program includes the commit level at which the program begins; selection stages 0 -------------------- 0 (3) CMS Pipelines Explained. + _______________________ 1 0 Page 6 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 begin at commit level -2; REXX stages begin at commit level -1; by default, other stages begin at commit level 0. 0 The pipeline dispatcher initiates the stage with the lowest commit level first. When more than one stage begins at a particular commit level, it is unspecified which one runs first. The stages at the lowest commit level run until they complete (exit or return) or issue a "COMMIT" pipeline command. 0 An aggregate return code is associated with a pipeline specification. Initially, the aggregate return code is zero. The aggregate return code for the pipeline specification is updated with the return code as each stage returns. If either number is negative, the aggregate return code is the minimum of the two numbers; otherwise, it is the maximum. 0 When all stages at the lowest commit level have ended or committed to a higher level, the stages at the next commit level are examined. Stages that would begin at the new commit level are abandoned if the aggregate return code is not zero. For stages that [have already started and] are waiting to commit to the new commit level, the return code for the "COMMIT" pipeline command is set to the aggregate return code; those stages are then made ready to run. The aggregate return code is sampled at the time the pipeline specification is raised to the new commit level. All stages committing to a particular level see the same return code, even if one of them subsequently returns with a non-zero return code before another stage has begun to run at the new level. 0 A stage can inspect the "COMMIT" return code and perform whatever action is required; built-in programs deallocate resources they have allocated and return with return code zero when the "COMMIT" return code is not zero, thus quitting when they determine that another stage has failed. 0 By convention, all built-in programs process data on commit level 0.(4) 0 The beginning commit level for each of the built-in programs is specified in its help file, as are the conditions which must be met before it will raise its commit level to 0. For example, the help file for the "tcpclient" stage says: 0 Commit Level: "tcpclient" starts on commit level -10. It connects to the server's port, verifies that its secondary input stream is not connected, and then commits to level 0. 0 0 -------------------- 0 (4) CMS/TSO Pipelines Author's Edition, SL26-0018. + __________________________________ 1 0 CMS Pipelines Initiation and Termination Page 7 ------------------------------------------------------------------------ 0 You will note that while it is at a negative commit level, "tcpclient" verifies both that it has been invoked with a legal stream configuration and that it can establish a network connection to the specified TCP/IP server. Unless both of these requirements have been met, there is no point in allowing data to flow through the pipeline (and possibly do damage), as the pipeline cannot possibly complete successfully, so both requirements are checked before the pipeline reaches commit level 0. 0 Now, let's look at the steps in the commit process for a simple pipeline: 0 "pipe < input file | locate /pipe/ | rexx yourprog" 0 It reads an input file, selects lines, and passes the lines on to one of your REXX programs. The three stages are arranged like this when the pipeline specification is handed from the scanner to the dispatcher: 0 Figure 28. Initial Commit Levels + Figure 28. Initial Commit Levels 0 0: | < -1: | yourprog -2: | locate 0 "<" starts at commit level 0, "locate" starts at commit level -2, and the REXX program starts at commit level -1. The dispatcher sets the current commit level to -2 and starts "locate", which checks its secondary input stream to make sure it is not connected. On seeing that [the input stream] is not connected, "locate" commits to level 0: 0 Figure 29. Commit Levels after First Commit + Figure 29. Commit Levels after First Commit 0 0: | < locate -1: | yourprog 0 Your program is then started. Presumably it reads an input record; this causes an implied commit to level 0. All stages are now at commit level 0 and can call the dispatcher to transport data.... 0 To illustrate a stage that detects an error and exits instead of committing, consider this pipeline: 0 "pipe < input file | L: locate /pipe/ | L: | rexx myprog" 0 "locate" is dispatched at commit level -2 as before; when it now finds that its secondary input stream is connected, it issues a message and terminates with a non-zero return code. This causes 1 0 Page 8 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 the remaining stages to be abandoned.(5) 0 The Commit Process when ADDPIPE and CALLPIPE are Used + The Commit Process when ADDPIPE and CALLPIPE are Used + _____________________________________________________ 0 Stages must be at the same commit level for data to pass between them, except when data flow on a connection that has been set up with "ADDPIPE".(6) The pipeline stalls if a stage at one commit level reads or writes a record after the stage at the other side of the connection has issued a "COMMIT" pipeline command to commit to a higher level.(7) 0 The scope of the commit level is a pipeline specification. Pipelines added with "ADDPIPE" commit without co-ordinating their commit level with the pipeline that added them. Pipeline specifications that are issued with "CALLPIPE" and contain no connectors (an unconnected pipeline specification) also commit without co-ordination with the caller. 0 When a pipeline specification that is issued with "CALLPIPE" (and is connected to its caller) increases its commit level, the pipeline dispatcher checks that the commit level for the stage that issued the "CALLPIPE" is at or above the new level requested. When the subroutine would go to a commit level that is higher than the caller's current commit level, the pipeline dispatcher performs an implicit commit for the stage that issued the "CALLPIPE". The subroutine pipeline proceeds only after the caller's commit has completed (that is, only after the commit level of the calling pipeline has been raised to the new level). If the caller is itself in a subroutine pipeline, the new commit level propagates upwards.(8) 0 -------------------- 0 (5) CMS Pipelines Explained. + _______________________ 0 (6) "Although a pipeline set that invokes another pipeline specification with the PIPE command or the "runpipe" built-in program become suspended while the new pipeline specification runs (with "runpipe" the two take turns at running as the new pipeline specification produces messages--PIPE suspends the issuing stage until the pipeline specification has run to completion), there is no danger of a deadlock between the added pipeline specification and the issuing stage, because the two pipeline specifications are not connected to one another." CMS Pipelines Explained, page 32. + _______________________ 0 (7) In the stall trace, a stage that has issued a "COMMIT" command and is waiting to be redispatched at the higher commit level is marked "wait com". 0 (8) CMS/TSO Pipelines Author's Edition, SL26-0018. + __________________________________ 1 0 CMS Pipelines Initiation and Termination Page 9 ------------------------------------------------------------------------ 0 The Commit Process for REXX Pipeline Stages + The Commit Process for REXX Pipeline Stages + ___________________________________________ 0 A REXX pipeline stage begins at commit level -1. The commit level for a REXX stage is automatically raised to level 0 when it first issues an "OUTPUT", "PEEKTO", "READTO", or "SELECT ANYINPUT" pipeline command. Because the pipeline dispatcher raises the commit level automatically, most REXX programs need not be concerned with commit levels. In the usual case, a REXX program validates its arguments before it begins reading and writing data. If it finds an error in its arguments and exits with an error return code before it has used any of the four commands that cause an automatic commit, the pipeline specification will in effect terminate at commit level -1, before data have begun flowing and before other stages have taken any irreversible actions (assuming they adhere to the convention of doing such on commit level 0). On the other hand, if a REXX program finds no error in its arguments and begins to process data by using one of these four commands, the automatic commit is done, suspending that stage until all other stages are ready for data to flow. 0 In some cases the automatic setting of the commit level for REXX programs may not be suitable. If your REXX program erases files or performs some other irreversible function before it reads or writes, it should first use the "COMMIT" pipeline command to do an explicit commit to level 0 to wait until all other stages have validated their arguments. If the return code on "COMMIT" is not zero, your program should undo any changes it may have made and exit with return code 0 [as illustrated by this example:](9) 0 0 0 0 0 0 0 0 -------------------- 0 (9) CMS/TSO Pipelines Author's Edition, SL26-0018. + __________________________________ 1 0 Page 10 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 +----------------------------------------------------------------------+ | | | /*---------------------------------------------------------------*/ | | /* First, check that my requirements for successful execution */ | | /* have been met. */ | | /*---------------------------------------------------------------*/ | | | | If Arg(1) \= '' Then Do /* Invoked with argument? */ | | 'ISSUEMSG 112 SAMPLE /'Arg(1)'/' /* Yes, I don't want one. */ | | Exit RC | | End | | | | 'STREAMSTATE INPUT 1' /* Secondary input connected? */ | | If RC >= 0 & RC < 12 Then Do /* Yes, that won't do at all. */ | | 'ISSUEMSG 1196 SAMPLE /1/' | | Exit RC | | End | | | | Call Alloc /* Allocate my resources. */ | | If RC <> 0 Then Do /* Was that successful? */ | | 'ISSUEMSG 1023 SAMPLE' /* No, say so. */ | | Exit RC /* Must abandon the pipeline. */ | | End | | | | /*---------------------------------------------------------------*/ | | /* I have the correct environment. Commit to 0 to wait until */ | | /* all other stages are equally prepared. */ | | /*---------------------------------------------------------------*/ | | | | 'COMMIT 0' /* Wait til others are ready. */ | | | | If RC <> 0 Then Do /* Was somebody else unlucky? */ | | Call Dealloc /* Yes, give back resources. */ | | Exit 0 /* I didn't find any problems. */ | | End | | | | /*---------------------------------------------------------------*/ | | /* It is now safe for me to make irreversible changes. */ | | /*---------------------------------------------------------------*/ | | | +----------------------------------------------------------------------+ 0 This stage tests for whether it was invoked with arguments and exits with return code 112 if it was. It uses the "STREAMSTATE" pipeline command to determine whether its secondary input stream is connected and exits with return code 1196 if it is. It then calls its allocation routine (not shown) to allocate the resources it needs for successful execution. If that routine fails, this stage exits with return code 1023, still at commit level -1, thus causing the pipeline to be abandoned. 1 0 CMS Pipelines Initiation and Termination Page 11 ------------------------------------------------------------------------ 0 If its allocation routine succeeds, this stage is ready to process data. It commits to 0 to wait until the rest of the pipeline is also ready. If the return code from its "COMMIT 0" command is non-zero, it terminates "quietly". (It exits with return code 0 because it has discovered no error itself.) Note, however, that it is careful to deallocate its resources before exiting. 0 If the return code from its "COMMIT 0" command is 0, this stage can assume that all other stages have determined that they have been invoked correctly and that the resources they require to execute are available. Thus, it is free to take irreversible actions. 0 You should use a similar approach in your own REXX programs. Before committing to 0, they should not only check that they have been invoked legally, as in the example above, but they should also determine whether they can acquire all resources they require. They should allocate those resources while still at commit level -1, and they should be prepared to deallocate them if the "COMMIT 0" command completes with a non-zero return code. 0 If your REXX program needs to use any of the commands that cause an automatic commit before it is ready to commit to level 0, it must issue the "NOCOMMIT" pipeline command to disable the automatic commit and then later issue an explicit "COMMIT". To perform read or write operations on commit level -1 (to read a parameter file, for example), use "ADDPIPE" to connect the input or output stream (or both) to your REXX stage. (You cannot use "CALLPIPE" for this, because it would force a commit to level 0 before data could flow.) Having defined the new streams with "ADDPIPE", use "READTO" and "OUTPUT" to read and write. When you are finished, issue "SEVER" to restore the original connection. Then issue "COMMIT" to perform an explicit commit. Check the return code on the "COMMIT" before reading or writing the original stream.(10) 0 We will see an example of doing all this later. 0 Dispatching Order within a Commit Level + Dispatching Order within a Commit Level + _______________________________________ 0 When all of the stages have committed to 0, the pipeline can run; that is, the stages can start moving data under the control of the dispatcher. As was mentioned earlier, the order of dispatching at any + the order of dispatching at any commit level is unspecified. That includes commit level 0. Unwary + commit level is unspecified. plumbers tend to fall into the trap of assuming that the stages in their pipelines will be run in the order in which they occur in the pipeline specification. It ain't necessarily so! 0 -------------------- 0 (10) CMS/TSO Pipelines Author's Edition, SL26-0018. + __________________________________ 1 0 Page 12 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 Because this is such an easy trap to fall into, I think it is worth walking through an example in which a programmer assumed the order of dispatching. Last year, the folks in Endicott decided to do a major rewrite of CALENDAR REXX. Their new version contained a "CALLPIPE" command of this form: 0 +----------------------------------------------------------------------+ | | | 'CALLPIPE (endchar ?)', | | 'literal ... |', | | . . . | | 'y: faninany |', | | '*:', | | '?', | | 'literal ... |', | | . . . | | 'y:', | | '?', | | 'literal ... |', | | . . . | | 'y:' | | | +----------------------------------------------------------------------+ 0 Each pipeline segment formats the calendar for a single month and then feeds it to an input of the "faninany" stage, which gathers the monthly calendars together and writes them to the output stream. As you will see, there is an assumption that the output of the first "literal" stage will arrive at the input of the "faninany" before the output of the second and third "literal" stages and that the output of the second will arrive before the output of the third. This program will produce correct results only if that is true. In other words, it is dependent upon the dispatching order; it is assuming that the stages will run in a specific order. 0 Much of the time, it actually works as expected. Note, however, that there is nothing at all to prevent the dispatcher from starting up the second or third "literal" stage before it starts up the first one; the stages can produce output as soon as they are started; and the "faninany" will be equally happy to read from any of its input streams first. So, if the second or third "literal" stage happens to run and produce output before the first one, this pipeline will produce bad results. 0 This CALENDAR REXX can be made to fail very simply, as a number of folks who were using the "calendar" stage in a Web page found when they picked up the new version in the service stream. 0 "callpipe calendar | *:" 0 worked as expected, but 0 "callpipe calendar | literal
| *:" 1 0 CMS Pipelines Initiation and Termination Page 13 ------------------------------------------------------------------------ 0 did not. Simply inserting a "literal" stage into the pipeline after the "calendar" stage perturbed the dispatching order sufficiently to change the order in which the three "literal" stages were run, so CALENDAR REXX produced badly formatted results when used on a Web page. The fix, of course, was to change the "faninany" to "fanin" to enforce the order in which the input streams were read, regardless of the order in which the "literal" stages were dispatched. 0 This is a good example to keep in mind, as it really brings home the fact that the dispatching order is unspecified within a commit level. "When more than one stage begins at a particular commit level, it is unspecified which one runs first." 0 Committing to Levels Beyond 0 + Committing to Levels Beyond 0 + _____________________________ 0 When a stage commits to zero it is assured that all stages are prepared to process data; it can raise its commit level to 1 to determine whether all data were processed correctly. 0 The commit process continues beyond level 0. After all of the stages running at commit level 0 have returned to the dispatcher or have committed to a level higher than 0, the dispatcher dispatches a stage at the lowest remaining level. Only one of the CMS Pipelines built-in + ______________ programs commits beyond level 0.(11) There are rare circumstances where you might find doing that to be useful, too, when you need to take some action only after all other stages have completed their processing successfully. 0 An example of this can be found in my pipeline measurement tool RITA. RITA measures its subject pipeline by running it through a pipeline of this form: 0 "PIPE (name Rita) storage . . . | prerita | runpipe events | . . . | rita | . . ." 0 The "storage" stage loads the pipeline specification to be measured into the Rita pipeline and writes it as a single record that becomes the input to the "runpipe events" stage. "runpipe events" executes the subject pipeline, producing "event records" that are fed to its output stream, where they are analyzed by the "rita" stage. 0 The "rita" stage expects to receive Pipelines accounting records in + _________ addition to the event records. Pipelines accounting is turned on or off + _________ by manipulating the Pipelines "msglevel" setting. Accounting records + _________ will be produced for any pipeline set started while the msglevel setting includes the x'2000' bit. So, Pipelines accounting needs to be on when + _________ 0 -------------------- 0 (11) The ">sfs SAFE" stage performs a pipeline commit to level 1 before it returns the unit of work. It rolls back the unit of work if its "COMMIT 1" command does not complete with return code 0. 1 0 Page 14 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 "runpipe" starts running the subject pipeline. However, accounting should not be on when the Rita pipeline is started up, because we don't want to measure it, nor should accounting be left on after the Rita pipeline has completed. 0 The "prerita" stage is a simple routine to turn Pipelines accounting on + _________ and off at the appropriate times by setting and restoring the Pipelines + _________ msglevel.(12) The msglevel must be set to a value that includes the x'2000' bit before the subject pipeline has started running. The original msglevel setting should be restored after the subject pipeline has run to completion. An easy way to enforce the required timing is for "prerita" to set the msglevel before the record containing the subject pipeline can reach the "runpipe" stage and to restore the msglevel after the "runpipe" stage has finished running the subject pipeline. 0 To ensure that it gets the new msglevel set before "runpipe" can start up the subject pipeline, "prerita" issues the PIPMOD MSGLEVEL command to turn accounting on before it lets the pipeline specification record pass from its input to its output. To ensure that it doesn't turn accounting off until after "runpipe" has run the subject pipeline, "prerita" commits to the maximum positive value before it issues the PIPMOD MSGLEVEL command to restore the original msglevel: 0 +----------------------------------------------------------------------+ | | | PreRita: /* Tell Pipelines to produce accounting messages. */ | | | | 'CALLPIPE (name GetMsgLevel)', | | 'query msglevel |', /* Get the original msglevel. */ | | 'nfind FRE:|', /* (In case storage tracing on.) */ | | 'spec 3.2 c2d 1 |', /* Second halfword is msglevel. */ | | 'var msglevel' /* Store in REXX environment. */ | | | | Address Command 'PIPMOD MSGLEVEL 8207' /* 8192+8+4+2+1. */ | | | | 'SHORT' /* Connect input to output. */ | | | | Trace Off | | 'COMMIT 2147483647' /* Wait til measured pipe done. */ | | Trace Normal | | | | Address Command 'PIPMOD MSGLEVEL' msglevel /* Restore setting. */ | | | | Exit | | | +----------------------------------------------------------------------+ 0 -------------------- 0 (12) "prerita" is used only when the version of CMS Pipelines is so old + _____________ that it does not support the "msglevel" keyword on "runpipe". 1 0 CMS Pipelines Initiation and Termination Page 15 ------------------------------------------------------------------------ 0 "prerita" issues a "CALLPIPE" to query and save the user's msglevel setting. It then sets the new msglevel to tell Pipelines to produce + _________ accounting records. Once the msglevel has been changed to the desired value, it issues a "SHORT" command to connect its input stream to its output stream, thus allowing the record containing the subject pipeline specification to pass from the "storage" stage to the "runpipe" stage, which will now run the subject pipeline at the desired msglevel setting. Thus, while "runpipe" is running the subject pipeline, Pipelines is + _________ producing accounting records that are written to the output of the "runpipe" stage to be analyzed along with the event records, but no accounting records are produced for the Rita pipeline itself, which had started before the msglevel was changed. 0 After "prerita" has shorted its streams, it issues a "COMMIT 2147483647" command to wait for the appropriate time to restore the msglevel. Ultimately, the subject pipeline will be run to completion, and the "runpipe" stage and all of the other stages will return to the dispatcher. The dispatcher will then look for work to do and find that there are no stages remaining to be dispatched on level 0, so it will raise the commit level and find that it can dispatch "prerita" again, at which time "prerita" will issue the PIPMOD MSGLEVEL command to restore the original msglevel setting. "prerita" will then return to the dispatcher itself, allowing the pipeline to terminate. 0 That version of "prerita" works correctly, but it is overkill. The trick with "SHORT" is not necessary. "prerita" doesn't need to hold up the pipeline specification record until it has the environment for "runpipe" set correctly for producing accounting records. In fact, "prerita" could be in a separate pipeline segment with no streams connected to the rest of the Rita pipeline: 0 "PIPE (endchar ? name Rita) storage . . . | runpipe events | . . . | rita | . . . ? prerita" 0 And it wouldn't need the "SHORT" command, if it had no input or output connected: 1 0 Page 16 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 +----------------------------------------------------------------------+ | | | PreRita: /* Tell Pipelines to produce accounting messages. */ | | | | 'CALLPIPE (name GetMsgLevel)',/* This can run before COMMIT 0. */ | | 'query msglevel |', /* Get the original msglevel. */ | | 'nfind FRE:|', /* (In case storage tracing on.) */ | | 'spec 3.2 c2d 1 |', /* Second halfword is msglevel. */ | | 'var msglevel' /* Store in REXX environment. */ | | | | Address Command 'PIPMOD MSGLEVEL 8207' /* 8192+8+4+2+1. */ | | | | Trace Off | | 'COMMIT 2147483647' /* Wait til measured pipe done. */ | | Trace Normal | | | | Address Command 'PIPMOD MSGLEVEL' msglevel /* Restore setting. */ | | | | Exit | | | +----------------------------------------------------------------------+ 0 Why does this work? If we were to trace the Rita pipeline, we would see that the commands up to and including the "COMMIT 2147483647" are issued while the pipeline is at commit level -1, so there is no danger that "runpipe" will read the pipeline specification record before the PIPMOD MSGLEVEL 8207 command is issued. Because the subroutine pipeline created by the "CALLPIPE" command has no streams connected to the calling pipeline, it does not co-ordinate its commit process with that of the calling pipeline. The subroutine pipeline commits to 0 and runs while this REXX stage and the rest of the Rita pipeline are still at commit level -1. This "prerita" is not automatically committed to level 0 as a result of issuing the "CALLPIPE" command, as it would be if the "CALLPIPE" used a connector. 0 Thus, this version of "prerita", like the previous one, runs at commit level -1 until it has issued the "COMMIT 2147483647" command. It is guaranteed to set the msglevel to turn on accounting well before the record containing the subject pipeline specification could arrive at the "runpipe" stage, even though this stage and the "runpipe" are in separate pipeline segments. This is because we are guaranteed that all stages in a pipeline specification will complete the portion of their processing that is done at commit level -1 before records begin to flow through the pipeline at commit level 0. This is true even when multiple segments of a pipeline specification have no connections to one another. The dispatching order we need in this case will be enforced by the commit process without further effort on our part. 1 0 CMS Pipelines Initiation and Termination Page 17 ------------------------------------------------------------------------ 0 Review of Commit Process for a REXX Stage + Review of Commit Process for a REXX Stage + _________________________________________ 0 To review the commit process for a REXX stage, let's look at an alternative implementation of the "prerita" stage. Like the variant we've just looked at, this one could occur anywhere in the Rita pipeline. It relies solely on commit levels to get the timing right: 0 +----------------------------------------------------------------------+ | | | PreRita: /* Tell Pipelines to produce accounting messages. */ | | | | 'ADDPIPE (name GetMsgLevel)', /* This is a prefixed ADDPIPE. */ | | 'query msglevel |', /* Get the original msglevel. */ | | 'nfind FRE:|', /* (In case storage tracing on.) */ | | 'spec 3.2 c2d 1 |', /* Second halfword is msglevel. */ | | '*.input.0:' /* Feed into this stage's input. */ | | | | 'NOCOMMIT' /* Let me do a READTO before... */ | | /* ...the pipeline commits to 0. */ | | 'READTO msglevel' /* Get msglevel from ADDPIPE. */ | | | | Address Command 'PIPMOD MSGLEVEL 8207' /* 8192+8+4+2+1. */ | | | | Trace Off | | 'COMMIT 2147483647' /* Wait til measured pipe done. */ | | Trace Normal | | | | Address Command 'PIPMOD MSGLEVEL' msglevel /* Restore setting. */ | | | | Exit | | | +----------------------------------------------------------------------+ 0 Like other REXX filters, this "prerita" is entered at commit level -1. It takes advantage of that fact to save and set the msglevel while the pipeline is still at level -1. Because records won't flow through the pipeline until it reaches commit level 0, setting the required msglevel while the pipeline is at level -1 insures that the msglevel is set properly by the time the record containing the pipeline specification to be measured reaches the "runpipe" stage somewhere else in the Rita pipeline specification. 0 This alternative "prerita" queries and saves the msglevel setting using an "ADDPIPE" rather than a "CALLPIPE". You will recall that a "var" stage in a pipeline created with "ADDPIPE" cannot store into a REXX filter; it always stores into the REXX environment in which the PIPE command was issued. So, this "ADDPIPE" does not use a "var" stage to save the msglevel setting. Instead, it writes the msglevel value to an output stream that is connected to the input side of this stage. (In other words, the "ADDPIPE" here is configured as a "prefixed "ADDPIPE"".) You will also recall that data can flow between a REXX filter and a pipeline created with "ADDPIPE" before the calling pipeline 1 0 Page 18 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 is committed to level 0, so this "ADDPIPE" can be run while the pipeline as a whole is still on commit level -1, even though it uses a connector. 0 If the REXX code in "prerita" were to issue a "READTO" command to read from the input stream, however, "prerita" would, by default, be raised to commit level 0 automatically. That is, a "COMMIT 0" command would be issued on its behalf, forcing it to wait to regain control until the rest of the pipeline had reached commit level 0 (and data were flowing in the pipeline). That might be too late, of course; the pipeline specification record might then reach the "runpipe" stage before "prerita" had regained control to set the msglevel. To avoid that, "prerita" issues a "NOCOMMIT" command before it issues the "READTO" command to read the msglevel record from the added pipeline. Thus, while the pipeline as a whole is still at commit level -1, "prerita" can issue the PIPMOD MSGLEVEL command to set the desired msglevel. 0 Then, while still at commit level -1, it issues the "COMMIT 2147483647" command to wait for the appropriate time to restore the msglevel, as before. 0 Counter Example + Counter Example + _______________ 0 Having shown how the "prerita" stage could ensure the timing it needed for restoring the msglevel by committing to a level above 0, I am now forced to point out that it could have gotten the right timing more traditionally (without worrying about commit levels): 0 +----------------------------------------------------------------------+ | | | PreRita: /* Tell Pipelines to produce accounting messages. */ | | | | 'CALLPIPE (name GetMsgLevel)', | | 'query msglevel |', /* Get the original msglevel. */ | | 'nfind FRE:|', /* (In case storage tracing on.) */ | | 'spec 3.2 c2d 1 |', /* Second halfword is msglevel. */ | | 'var msglevel' /* Store in REXX environment. */ | | | | Address Command 'PIPMOD MSGLEVEL 8207' /* 8192+8+4+2+1. */ | | | | 'PEEKTO record' /* Read pipeline specification. */ | | 'OUTPUT' record /* Write pipeline specification. */ | | 'READTO' /* Consume pipeline spec. */ | | | | Address Command 'PIPMOD MSGLEVEL' msglevel /* Restore setting. */ | | | | Exit | | | +----------------------------------------------------------------------+ 0 Like our first version, this "prerita" expects to be placed between the "storage" stage and the "runpipe" stage so that the pipeline 1 0 CMS Pipelines Initiation and Termination Page 19 ------------------------------------------------------------------------ 0 specification record must pass through it. It uses record flow, rather than the commit process, to control the timing of its second PIPMOD MSGLEVEL command. 0 This "prerita" is, of course, entered at commit level -1. As before, it saves and sets the msglevel while the pipeline is still at level -1. It then issues a "PEEKTO" command, which causes it to commit to level 0. It reads the pipeline specification record and copies it to its output, which is connected to the input of the "runpipe" stage. Issuing the "OUTPUT" command causes "prerita" to become blocked waiting for its output record to be consumed. (Its "OUTPUT" command will not complete until a "READTO" command in the "runpipe" stage has consumed the pipeline specification record.) Because "runpipe" does not delay the record, this "OUTPUT" command will not complete until "runpipe" has finished executing the subject pipeline. (The Record Delay section of the help file for "runpipe" says, ""runpipe" writes all output for an input record before consuming the input record".) Thus, by the time its "OUTPUT" command completes, this "prerita" can safely turn off Pipelines + _________ accounting by restoring the msglevel setting, as "runpipe" will have finished running the subject pipeline by then.(13) 0 III. PIPELINE TERMINATION + III. PIPELINE TERMINATION + __________________________ 0 End-of-File Propagation + End-of-File Propagation + _______________________ 0 Let's now turn to the subject of pipeline termination. A pipeline terminates when all of its stages have terminated. Stages terminate for a variety of reasons, sometimes because they have finished doing whatever job they were supposed to do, such as reading a file or passing along ten records, sometimes because they try to read more input and discover that there is none, and sometimes because they try to write more output and discover that there is nobody left to read it. When a stage terminates because it can no longer read input or write output, end-of-file is said to have been propagated. End-of-file propagation + __________ allows a pipeline to shut down gracefully; we will discuss it at some length. 0 We will start with this simple subroutine pipeline, which discards all of the records from its input stream until it finds a specific marker record, at which point it terminates: 0 "/* Position input at first USER card */" "'callpipe *: | tolabel USER | hole'" 0 0 -------------------- 0 (13) If you need a review of record delay, see my paper Cramming for the + ________________ Journeyman Plumber Exam, Part I: Record Flow in CMS Pipelines. + _____________________________________________________________ 1 0 Page 20 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 The records up to the marker are consumed in the subroutine pipeline by "hole", which sucks up all input records and produces no output. "tolabel" terminates when it sees the marker record (here a USER card in a CP directory). 0 When "tolabel" terminates, it returns to the pipeline dispatcher, thus making it impossible for "hole" to read any more records, so "hole" also terminates and returns to the dispatcher, allowing the subroutine pipeline as a whole to terminate. 0 As part of the clean-up after a stage has returned to the dispatcher, the dispatcher goes through the stage's streams and severs those that are still connected. The stage at the other + ______ side of such a connection now receives end-of-file; by definition an unconnected stream is at end-of-file. Once a stream becomes + ___________ unconnected, no other stage can reach it and data can never flow on it again. 0 If a stage is blocked in a read or write for a stream that becomes unconnected, it receives return code 12 and is made dispatchable. Subsequently, a stage receives return code 12 immediately if it reads or writes to an unconnected stream. 0 It should be no surprise that a device driver at the beginning of a pipeline (for example, "<") terminates when it has read the file into the pipeline; the stage after the device driver gets return code 12 when it tries to read and the input stream is not connected.... This cause[s that] stage to terminate, and so on; thus, the stages propagate end-of-file (forward in this case). + _________ _______ 0 But some stages can set end-of-file on their input sides as well. For example, let us add a stage to the example above: 0 "/* Position input at first USER card */" "'callpipe *: | xlate upper | tolabel USER | hole'" 0 The "xlate" stage ensures that the records passed to "tolabel" are in uppercase. Watch as a USER record moves through this pipeline: 0 1. "xlate" peeks at the record, builds an uppercased record in a buffer and writes it to the pipeline. 0 2. "tolabel" peeks at the record, determines it has seen what it wants, and returns without consuming the input record. 0 3. The dispatcher cleans up after "tolabel". It sets end-of-file for "hole". No doubt, we all expected as much. But it also sets end-of-file on "xlate"'s write! 0 4. "xlate" has no reason to continue; no one will ever read its output records; it can perform no useful function and, therefore, it returns without consuming the USER record, which remains in the input pipeline. 1 0 CMS Pipelines Initiation and Termination Page 21 ------------------------------------------------------------------------ 0 Thus, end-of-file can propagate backwards in a pipeline as well. + _________ 0 Here is yet another twist to this concept: 0 "/* Position input at first USER card */" "'callpipe *: | xlate upper | fromlabel USER '" 0 "fromlabel" is the converse of "tolabel". It discards records until it meets one that starts with the specified string. The matching record and the remaining input are passed to the output. But when "fromlabel" in this example writes the USER record to its output, it receives end-of-file immediately (think of the dispatcher waiting to pounce!) and returns without consuming the USER record. This, too, causes end-of-file to propagate backwards.... 0 Recall that device drivers that are neither first nor last in a pipeline copy the records to the output as well as to a host interface. Such a program continues to process records until it sees end-of-file on its input, because its primary task is to access the host interface; copying records to the following stage is a convenience; the fact that the output stream is no longer connected has no effect on the host object being created or modified. 0 [Some stages propagate end-of-file backwards in some settings but not in others. For example, when a "count" stage has only one output stream,] it reads all its input and produces a single output record when its input is at end-of-file; thus, it propagates end-of-file only [forward] from its input side. + __________ "count" can also operate as a "gas meter", measuring the data flowing through it; it enters this mode when it finds that it has a secondary output stream. In the meter mode, "count" propagates end-of-file both ways; that is, when its primary output stream becomes unconnected, it writes the current count to its secondary output stream and returns to the dispatcher without consuming (or counting) the record that caused the [primary output stream] to be severed. [So, it propagates end-of-file backwards.] 0 Note that the decision on what to do at end-of-file is taken by the individual program; the pipeline dispatcher does not require or enforce any particular protocol. In general, however, the CMS + ___ Pipelines built-in filters propagate end-of-file in both + _________ directions, while the device drivers that are not first in a pipeline propagate end-of-file only [forward] from their input. 1 0 Page 22 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 Propagating End-of-File for Real--Summarily Stopable Stages + Propagating End-of-File for Real--Summarily Stopable Stages + ___________________________________________________________ 0 Propagating end-of-file forwards in a pipeline is seldom a challenge for the pipeline programmer; after all, when there is no further input, it is natural to stop. 0 Propagating end-of-file backwards as quickly as possible is not as easy as it may seem. To terminate a program as soon as it is clear that no further productive work can be accomplished, a change in the semantics of pipeline commands is needed. Consider this pipeline segment: 0 " . . . | bagvendt | take 1 | . . ." 0 "bagvendt" is a simple REXX pipeline filter ("bagvendt" is Danish for "reverse"): 0 +------------------------------------------------------------------+ | | | /* BAGVENDT REXX -- Reverse contents of lines in pipeline */ | | signal on error | | do forever /* Loop through all lines */ | | 'peekto data' /* Read a line into "data" */ | | 'output' reverse(data) /* Write the reverse */ | | 'readto' /* Done with input record */ | | end | | error: exit RC*(RC<>12) | | | +------------------------------------------------------------------+ 0 Watch as the first record flows through this pipeline segment: 0 1. "bagvendt" peeks at the record and writes the reverse of it. This causes "bagvendt" to be blocked waiting for its output to be consumed. 0 2. "take" peeks the record and passes it on. Eventually its output record is consumed and it consumes its input record. 0 3. Both "bagvendt" and "take" can now run. Let us assume that "take" is resumed first; it terminates without doing further pipeline I/O. 0 4. "bagvendt" consumes its input record and peeks for the next. 0 At this point, if "bagvendt"'s output stream is not already severed, it will be severed very soon; but "bagvendt" does not discover this fact immediately. Once it has issued the "PEEKTO" pipeline command, it must wait for an input record to arrive or the input stream to be at end-of-file. It will discover (from the return code on "OUTPUT") that its output stream is at end-of-file when it produces the next output record. 1 0 CMS Pipelines Initiation and Termination Page 23 ------------------------------------------------------------------------ 0 Obviously, this is not an entirely satisfactory situation. "bagvendt" must do extra work to read the second record, which it will then not be able to write to its output stream. More important than the wasted effort, however, is the fact that end-of-file does not flow backwards as expeditiously as it might. That could tend to make a pipeline more difficult to shut down at the proper time. 0 Originally, built-in programs had the same problem as "bagvendt", but in 1991 John Hartmann addressed that problem by defining a new attribute for built-in programs, specified by including the STOPABLE=YES parameter in the program descriptor. 0 [Now,] built-in programs can propagate end-of-file backwards faster than "bagvendt" can do because they can be defined as summarily stopable. Essentially, a summarily stopable program has + __________________ declared to the pipeline dispatcher that it may be terminated at any time it is waiting for input and its input stream or its output stream is severed. In return, the stage is restricted to use buffers that are declared to the pipeline dispatcher.(14) 0 User-written Assembler stages can (and usually should) be made summarily stopable, too, but in writing a summarily stopable stage you must keep in mind that it might not regain control after any call to the CMS + ___ ___ Pipelines dispatcher. (This is why summarily stopable stages leave + _________ buffer management to the dispatcher.) 0 Later Improvements to End-of-File Propagation + Later Improvements to End-of-File Propagation + _____________________________________________ 0 Some stages cannot be made summarily stopable. For example, stages that access host interfaces often need to release a resource before they terminate; others, such as "unique LAST" still need to write a record when they get end-of-file on their input stream. For such filters, a facility [was made available in 1996] to modify the semantics of the "PEEKTO" pipeline command. Stages [that cannot be made summarily stopable] can now [request to] receive return code 8 on "PEEKTO" when their output stream is or becomes severed before the next input record is available. 0 The same service was made available to user-written Assembler stages that cannot be made summarily stopable. If the program descriptor for a stage specifies STOPABLE=RC8, any "PEEKTO" command issued by the stage will be terminated with return code 8 if the stage's output streams become severed. With this enhancement in place, end-of-file will be 0 -------------------- 0 (14) "The notion of summarily stopable stages was introduced to make storage management more reliable, because the pipeline dispatcher would take care of releasing buffers. However, this was soon extended to allow faster propagation of end-of-file backwards through filters and selection stages." CMS Pipelines Explained, + ________________________ page 37. 1 0 Page 24 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 propagated backwards as quickly as possible; the stage that uses STOPABLE=RC8 will not need to wait until it has read another record before it can notice that its output stream has been severed. 0 Clearly the "rexx" interface itself cannot be made summarily stopable, because it must unwind the REXX environment before it can return to the pipeline dispatcher. Instead, the pipeline command "EOFREPORT ALL" is used to modify the behavior of "PEEKTO" to set return code 8 when the output stream is severed. [This is equivalent to specifying STOPABLE=RC8 in the program descriptor of an Assembler stage.] A revised "bagvendt", which propagates end-of-file backwards as fast as is possible is shown [here]: 0 +------------------------------------------------------------------+ | | | /* BAGVENDT REXX -- Reverse contents of lines in pipeline */ | | trace off | | 'eofreport all' /* Prepare to propagate EOF */ | | trace failure | | signal on error | | do forever /* Loop through all lines */ | | 'peekto data' /* Read a line into "data" */ | | 'output' reverse(data) /* Write the reverse */ | | 'readto' /* Done with input record */ | | end | | error: exit RC*(RC<0) | | | +------------------------------------------------------------------+ 0 The change from the previous example is the three lines added at the beginning of the file (lines two through four). Line three ("EOFREPORT ALL") enables the new interface. The second line gives backwards compatibility; it suppresses REXX trace of the negative return code, which would be issued if the version of CMS + ___ Pipelines being used does not include support for "EOFREPORT". + _________ The fourth line enables tracing of subsequent failures. 0 The last line of the example is amended to cater for the possible return codes. "PEEKTO" still returns 12 when the input stream is at end-of-file, that is, when the stage should propagate end-of-file forwards. Thus, any positive return code is now considered OK, rather than just 12. 0 All of the built-in stages that could benefit from this dispatcher enhancement were updated to use it. The effect is seen particularly in various timing-sensitive gateway stages, such as "gate", "lookup", and "predselect". Because end-of-file now flows through a pipeline more "quickly" than before, writing complex pipelines that don't stall or hang is simpler than it used to be. 0 All REXX stages now can (and should) be written to notice the severing of an output stream while they are waiting to read input or the severing 1 0 CMS Pipelines Initiation and Termination Page 25 ------------------------------------------------------------------------ 0 of an input stream while they are waiting to write output. In the revised "bagvendt", we saw how easy it is to use the enhanced REXX interface in a single-stream filter. Even with complex multi-stream REXX stages, the enhancements to the REXX interface allow user-written stages to propagate end-of-file in the same way that the built-in stages do. The complete REXX interface consists of one new pipeline command and enhancements to another: 0 o "STREAMSTATE" is the pipeline command for querying the state of a stage's input and output streams. "STREAMSTATE" was enhanced to support a "SUMMARY" keyword. The return code from "STREAMSTATE SUMMARY" is 0 as long as at least one input stream and one output stream are connected; otherwise, it is 8. This command can be used in filters for deciding whether useful work can still be done. 0 "STREAMSTATE" was also enhanced to support an "ALL" keyword, which is followed by the name of the variable into which the result should be placed, e.g., "'streamstate all states'". The result takes the form + _____ of a word for each defined stream, each word being in the form "RC:RC", where the two return codes are the states of the input and output sides of the stream. 0 o "EOFREPORT" is a new pipeline command used to specify new behavior for some old pipeline commands, as we saw in the revised "bagvendt". The commands that do I/O and wait for it to complete are enhanced to terminate when a stream other than the one they are waiting on is severed. That is, while your pipeline stage is waiting for I/O on one stream, it can detect that another stream has been severed. 0 One uses the command "EOFREPORT ALL" in stages that should propagate end-of-file backwards immediately (that is, in stages where you want the fact that the output has been severed to be discovered before the arrival of an input record would have caused the stage to try to write to its severed output). With "EOFREPORT ALL" specified, your stage stops waiting for an input record when its last output stream is severed. If the last connected output stream is severed while your stage is sitting in a "PEEKTO" or "SELECT ANYINPUT" waiting for an input record, that "PEEKTO" or "SELECT ANYINPUT" terminates with a return code of 8. You can test for return code 8 and take the appropriate action to clean up before exiting, thus propagating end-of-file backwards. 0 One uses the command "EOFREPORT ANY" in multistream stages that need to propagate end-of-file immediately even when they still have an input and an output connected. In addition to the changes caused by "EOFREPORT ALL", "EOFREPORT ANY" causes a return code of 4 to be set if any other stream is severed during "PEEKTO" or "SELECT ANYINPUT". An "OUTPUT" command will terminate with a return code of 4 if another stream is severed and the record being written by the "OUTPUT" has not yet been peeked by the consumer stage. (See the appendix for a sophisticated example of using "EOFREPORT ANY".) 1 0 Page 26 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 The "EOFREPORT" command itself sets a return code of 8 if there is not at least one connected input stream and one connected output stream. (That is, it issues a "STREAMSTATE SUMMARY" .) 0 For the rest of this presentation, I will be assuming that you are using a version of CMS Pipelines that is current enough to support these + _____________ dispatcher enhancements and that your locally-written pipeline stages use "EOFREPORT". There is no point in doing the hard stuff with one hand tied behind your back. 0 Using Backward Propagation of End-of-File + Using Backward Propagation of End-of-File + _________________________________________ 0 The author's help files for CMS Pipelines define the termination + ______________ conditions for each stage very clearly. With the help files and an understanding of backward propagation of end-of-file, it becomes simple to write a pipeline that stops immediately when any one of a number of possible events occurs. To do this, you need to use stages that terminate "prematurely" when their output is severed. (This is another way of saying that a stage propagates end-of-file backwards.) Most stages do propagate end-of-file backwards, except for the output device drivers. (If you need to run an output device driver in a pipe segment that must propagate end-of-file backwards, invoke it as the argument to an "eofback" stage.) 0 For a very simple example, let's look at the pipeline from a PH EXEC. It queries a CSO server across the network and should stop as soon as it has received the last line of the server's response, whether the query succeeds or fails. The last line of the response from CSO will start "501:" if no match is found in the database. If a match is found, the last line of the response will start "200:Bye!". The Premature Termination section of the help file for "tolabel" says, ""tolabel" terminates when it discovers that no output stream is connected". With that information, one can construct a pipeline that will terminate promptly whether or not there is a database match. Simply using two "tolabel" stages in a row will do the job: 0 +----------------------------------------------------------------------+ | | | 'PIPE (endchar ? name CallCSO)', /* Query CSO database. */ | | 'literal query' name 'return all |', /* Format command. */ | | 'append literal quit |', /* Append quit command. */ | | 'insert x0D25 after |', /* Suffix CR-LF. */ | | 'xlate from 1047 to 819 |', /* Translate to ASCII. */ | | 't: tcpclient' server port 'deblock linend 0A linger 60 |', | | 'xlate from 819 to 1047 |', /* Response to EBCDIC. */ | | 'tolabel 501:|', /* Quit if not found. */ | | 'tolabel 200:Bye!|', /* Quit if all done. */ | | 'console', /* Display response. */ | | '?', | | 't: | console' /* Display ERRNO, if any.*/ | | | +----------------------------------------------------------------------+ 1 0 CMS Pipelines Initiation and Termination Page 27 ------------------------------------------------------------------------ 0 If the first "tolabel" finds its target record, it will terminate, causing the second to see end-of-file on its input and also terminate. If the second one finds its target record, it will terminate, causing the first one to see end-of-file on its output and also terminate. In + ______ both cases, end-of-file is propagated backwards to "tcpclient", causing it (and the rest of the pipeline) to terminate. 0 Here is a slightly more difficult example, from an EXEC running in an OPERMON virtual machine. This pipeline waits until a predetermined time is reached or "stop" is typed on the virtual machine console or the + __ __ VMPRF virtual machine SMSGes a command starting with the word "DELETE": 0 +----------------------------------------------------------------------+ | | | 'PIPE (endchar ?)', /* Wait for something to happen: */ | | 'literal' oclock '|', /* Time to start/stop monitor. */ | | 'delay |', /* Wait until then. */ | | 'specs /Delay expired./ 1 |', /* Remember why we stopped. */ | | 'fin: faninany |', /* Gather event descriptions. */ | | 'take 1 |', /* Stop after any one event. */ | | 'console |', /* Display event description. */ | | 'var reason', /* And store it in REXX. */ | | '?', | | 'immcmd stop|', /* Wait for "STOP" command. */ | | 'specs /Stopped./ 1 |', /* Remember why we stopped. */ | | 'fin:', /* Feed reason to FANINANY. */ | | '?', | | 'starmsg *MSG |', /* Wait for *MSG input. */ | | 'find 00000004|', /* Must be an SMSG. */ | | 'specs 9-* 1 |', /* Remove message header. */ | | 'find VMPRF___DELETE|', /* Must be DELETE from VMPRF. */ | | 'specs', /* Convert to ERASE command. */ | | '/"ERASE/ 1 words 3.2 nextword /A"/ nextword |', | | 'fin:' /* Feed command to FANINANY. */ | | | +----------------------------------------------------------------------+ 0 The secret here is that the segments that process each of the events turn their event into a single record that they feed to a " faninany | take 1 " sequence. The "take 1" causes the "faninany" to terminate as soon as any of the events occurs. (End-of-file is propagated backwards from "take 1" to "faninany", because "faninany" terminates prematurely when its output stream is severed.) The termination of "faninany" causes the termination of the three "specs" stages that are feeding records into it, because "specs", too, terminates prematurely when its output stream is severed. The termination of the three "specs" stages causes the stages that feed records to them also to terminate, because those are also stages that terminate when their output streams are severed. So, ultimately, all three of the stages that are waiting for external events ("delay", "immcmd", and "starmsg") terminate as the direct result of the termination of the "take 1" stage. 1 0 Page 28 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 I think that you will agree that taking advantage of backward propagation of end-of-file makes a clean, simple way to shut a pipeline down gracefully. 0 Solving End-of-File Propagation Problems + Solving End-of-File Propagation Problems + ________________________________________ 0 When plumbers begin writing really complex pipelines, they sometimes run into problems with end-of-file propagation. Either their pipeline doesn't terminate when they want it to, or the whole pipeline terminates when they mean for only a part of it to, or they want to make the pipeline terminate but not until the current transaction is complete. Some of these problems can be rather challenging, but there are tools to help solve them: 0 1. If you need to understand the starting or stopping of the stages in a pipeline, use the "listrc" global option to display a message each time a stage starts or stops. That will tell you a great deal very easily. 0 2. In cases where a pipeline hangs when you want it to stop, "jeremy" can be helpful. (I wrote "jeremy" years ago to help me understand my first complex TCP/IP applications. Because their "tcpxxxxx" stages were waiting for something outside the virtual machine, these pipelines could get themselves into a state in which they wanted to stall but didn't because they were still hoping that something good would come in from the network to solve the problem.) 0 3. I strongly recommend using the "listerr" global option in all production pipelines. What "listerr" does is display a message anytime a stage completes with a non-zero return code. (If your pipeline has a stage that should complete with a non-zero return code, you can exempt it by using the "nolisterr" local option.) I find "listerr" to be particularly useful for analyzing cases of "crippled" pipelines, in which one segment of a complex pipeline has terminated quietly, leaving the other segments running but not performing the full intended function of the pipeline. 0 4. The "stoperror" global option was added recently to make it easier to prevent just such situations. "stoperror" causes the entire pipeline to terminate when any stage terminates with a non-zero return code. All running stages are dispatched with return code -4094 with their streams severed (similarly to what happens when a stall is detected), resulting in the pipeline as a whole terminating. Stages that are expected to terminate with non-zero return codes (such as "aggrc" or "state", for example) can be exempted from causing the pipeline to terminate by invoking them with the "nostoperror" local option. "stoperror" is particularly useful in cases where a complex pipeline has segments that are not interconnected and, thus, are not readily susceptible to being shut down by end-of-file propagation from one to the other. 1 0 CMS Pipelines Initiation and Termination Page 29 ------------------------------------------------------------------------ 0 5. The amount of end-of-file propagation for certain stages can be adjusted by specifying a "stop" option. "combine", "gather", and "specs" allow you to specify that they are to terminate if any of their input streams or a certain number of their input streams go to end-of-file. (By default, they terminate when all of their input streams go to end-of-file.) Similarly, "deal", "fanout", and "pipcmd" allow you to specify that they are to terminate if any of their output streams or a certain number of their output streams go to end-of-file. (By default, they terminate when all of their output streams go to end-of-file.) In this pipeline, for example, "combine", which ordinarily stops only when it has read all of the records from both of its input streams, is made to stop as soon as it has read the last record from its primary input stream: 0 +-------------------------------------------------------------------+ | | | 'PIPE (endchar ?)', | | . . . | | 'c: combine stop anyeof AND |', /* Only A$PUB and A$NT. */ | | . . . | | '?', | | 'strliteral xFFFFFFFFFFFFFFFF88 |', /* AND off others. */ | | 'duplicate * |', /* As many as we need. */ | | 'c:' /* Feed to COMBINE. */ | | | +-------------------------------------------------------------------+ 0 Since its secondary input stream has the ability to produce an infinite number of records, stopping before then is a good thing. 0 6. The "pipestop" stage is another tool for terminating a pipeline. When a record is read into "pipestop", it posts all ECBs that are being waited on by other stages with the code x'3F', thus forcing all stages that are waiting for external events to terminate. It also sets an indication in the pipeline set that will prevent further waiting for external events. This action affects the entire pipeline set, not just the pipeline specification, and it is irreversible. It is equivalent to issuing the PIPMOD STOP command. Although "pipestop" is sometimes useful in error handling, it should not be considered a normal way to terminate a subroutine pipeline. It is best used only in an emergency, abandon-all-hope situation. 0 A Simple Case: Let's now go through a simple example of working out why + A Simple Case + _____________ a pipeline is hanging rather than terminating. A couple of years ago, I got this note from Marty Zimelis: 0 +----------------------------------------------------------------------+ | | | I've stumbled over something puzzling while attempting to | | determine the output of the "immcmd" stage. The following | | pipeline terminates normally when the entire file has been read: | | | | | 1 0 Page 30 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 | 'PIPE (endchar ?)', | | 'immcmd stop |', | | 'g: gate', | | '?', | | '< some file |', | | 'g: |', | | 'console' (cont.) | | | +----------------------------------------------------------------------+ +----------------------------------------------------------------------+ | | | However, the following modification causes it to suspend after | | passing the file. It doesn't terminate until the "immcmd" stage | | is satisfied (i.e., I type "stop" on the command line): | + ____ | | | 'PIPE (endchar ?)', | | 'immcmd stop |', | | 'var stopped |', | | 'g: gate', | | '?', | | '< some file |', | | 'g:|', | | 'console' | | | | I know that "immcmd"'s behavior is atypical, but I can't quite | | figure out why the insertion of a "var" stage would change its | | interaction with "gate". It appears that the key sentence in the | | author's help for "gate" is: | | | | If no record arrives on the primary input stream, "gate" | | terminates normally when all input streams are at | | end-of-file. | | | | Help! Marty | | | +----------------------------------------------------------------------+ 0 The "gate" stage is another tool for stopping a pipeline. The concept behind "gate" is both very simple and very powerful. "gate" can have as many pairs of input and output streams as you like. If it gets a record on any input stream other than the primary, it just copies that record to the corresponding output. If it gets a record on its primary input, it terminates. When it terminates, all of its streams are severed, of course, so now there are little end-of-file "infections" wherever "gate" had a stream, and end-of-file may start propagating forward and backward from those spots. Used carefully, "gate" can easily terminate a large complex pipeline, or (even trickier) terminate only the desired subset of a pipeline. 0 What Marty was doing was using "gate" to interrupt a long-running pipeline segment that read a file and displayed it on the console. When he typed "stop" on the console, the output of the "immcmd" stage 1 0 CMS Pipelines Initiation and Termination Page 31 ------------------------------------------------------------------------ 0 appeared on the primary input of the "gate" stage, which caused "gate" to terminate, severing its secondary input and output streams. When that happened, records could no longer flow from the "<" stage to the "console" stage. His typing "stop" on the console interrupted the display of the file on the console. 0 If he didn't type "stop" on the console, the file would eventually finish displaying and the "<" and "console" stages would terminate. When that happened, "gate" would terminate because all of its non-primary input streams were at end-of-file. And, when "gate" terminated, "immcmd"'s output stream would be severed, causing it to terminate, too. That worked just fine until he inserted the "var" stage into his pipeline. After that, his pipeline would no longer shut down when the file finished displaying on the console. He had to type "stop" to get it to stop. Why was that? 0 The first thing I did to try to understand Marty's pipeline was run it with the "listrc" global option on the PIPE command. That caused a console display as each of the stages started up. That was followed by the display of the file from the "console" stage. Then stages started returning to the dispatcher as portions of the pipeline terminated, resulting in this display: 0 +----------------------------------------------------------------------+ | | | FPLDSP020I Stage returned with return code 0. | | FPLMSG003I ... Issued from stage 1 of pipeline 2. | | FPLMSG001I ... Running "< some file". | | FPLDSP020I Stage returned with return code 0. | | FPLMSG003I ... Issued from stage 3 of pipeline 2. | | FPLMSG001I ... Running "console". | | FPLDSP020I Stage returned with return code 0. | | FPLMSG003I ... Issued from stage 3 of pipeline 1. | | FPLMSG001I ... Running "gate". | | | +----------------------------------------------------------------------+ 0 This told me that "<", "console", and "gate" were terminating as expected. "<" terminated when it had finished reading the file. "console" terminated when it saw end-of-file on its input. And, as the help file had said it would, "gate" terminated (even without receiving a record on its primary input stream) when it received end-of-file on all of its higher-numbered input streams. 0 But the pipeline was clearly stuck at that point, with "immcmd" and "var" still running, so I typed "pipmod stop" on the virtual machine console and got this display: 1 0 Page 32 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 +----------------------------------------------------------------------+ | | | FPLDSP020I Stage returned with return code 0. | | FPLMSG003I ... Issued from stage 1 of pipeline 1. | | FPLMSG001I ... Running "immcmd STOP". | | FPLDSP020I Stage returned with return code 0. | | FPLMSG003I ... Issued from stage 2 of pipeline 1. | | FPLMSG001I ... Running "var stopped". | | Ready; | | | +----------------------------------------------------------------------+ 0 The PIPMOD STOP command stops any stage that is waiting for an external event. In this case, it stopped "immcmd", which was waiting for console input. When "immcmd" returned to the dispatcher, its output stream was severed, causing "var"'s "PEEKTO" to complete with return code 12. That caused "var" to terminate as well. Once "var" had returned to the dispatcher, the PIPE command could complete. Thus, it would appear that what we are seeing here is that end-of-file can propagate forward from "immcmd" to "var", but it does not propagate backward from "gate" to "var". 0 To get a better look at the problem, I added an "immcmd" stage that could be used to invoke "jeremy": 0 'PIPE (endchar ?)', 'immcmd STOP |', 'var stopped |', 'g: gate', '?', '< some file |', 'g: |', 'console', '?', 'immcmd JEREMY | jeremy | > jeremy output a' 0 Now, the immediate problem with doing this is, of course, that there is nothing to terminate the added "immcmd" stage, so even if the pipeline hadn't been hanging before, it would hang now. But that's OK. One just runs the pipeline until it gets to the point where it usually hangs and then one types "jeremy" on the virtual machine console. After pausing long enough to be sure "jeremy" has run to completion, one types "pipmod stop" to terminate any stage waiting for an external event, including the "immcmd jeremy" stage, and that allows the PIPE command to complete. Then one examines JEREMY OUTPUT to figure out the hang: 1 0 CMS Pipelines Initiation and Termination Page 33 ------------------------------------------------------------------------ 0 +----------------------------------------------------------------------+ | | | Pipeline specification <1> | | ? | | immcmd STOP 1.1.1 Wait.ECB: 80000000 | | var stopped 1.1.2 Wait.locate | | g: gate 1.1.3 Returned: dropped | | ? | | < some file 1.2.1 Returned: dropped | | g: | | console 1.2.3 Returned: dropped | | ? | | immcmd JEREMY 1.3.1 Wait.out | | record on output 0: "" | | jeremy 1.3.2 Ready | | > jeremy output a 1.3.3 Wait.locate | | | +----------------------------------------------------------------------+ 0 Ignoring the "jeremy" segment, we see that all but two stages in the pipeline have terminated, just as before. "immcmd STOP" is waiting for console input. "var stopped" is waiting for pipeline input, even though its output stream has obviously been severed, given that "gate" has returned to the dispatcher. Why has "var" not terminated? 0 Remember that "var" is an output device driver. It can do useful work (that is, it can set the value of the REXX variable) even when its output stream is unconnected, so it does not terminate when its output stream is severed. Like all other output device drivers, "var" does not propagate end-of-file backwards.(15) 0 So, the fact that "gate" had terminated and its streams had been severed did not persuade "var" to terminate. In this case, however, Marty wanted "var" to propagate end-of-file backwards, so that both it and "immcmd" would terminate when "gate" did. Fortunately, the fix was very simple. All he needed to do was replace "var stopped" with "eofback var stopped", which would cause end-of-file to be propagated backwards from the "var" stage. 0 A Digression on EOFBACK: I've mentioned the "eofback" stage several + A Digression on EOFBACK + ________________________ times now, but haven't really discussed it, so let's do that now. 0 0 -------------------- 0 (15) Note that "var" (without the "tracking" keyword) is slightly atypical of output device drivers in that once it receives a single input record, it sets the REXX variable, copies the input record to its output, and then shorts its input to its output. After that, you no longer need to worry about whether it propagates end-of-file, since it is no longer in the pipeline at all. 1 0 Page 34 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 "eofback" does exactly what it is supposed to do, but that is not always exactly what one may wish it to do. The argument to "eofback" is an output device driver. "eofback" passes each input record to its output stream and waits for the record to be consumed. Only after the record has been consumed does "eofback" pass it to its output device driver to write to its host interface, whatever that may be. "eofback" terminates "prematurely" when it discovers that its output stream is not connected. That is, it propagates end-of-file backwards. 0 You will note that if "eofback" is trying to write an output record at the time it discovers that its output is no longer connected, that record is not sent to the output device driver. This can make good sense. In some situations, one would want only the records that have actually been written to the output stream to be written to the host interface. Often, however, that isn't really what one wants, particularly in cases where, for example, one has inserted an "eofback console" stage into a pipeline to try to see the records passing through it. The last record, the one that triggered termination, won't be displayed by the "eofback console" stage. A side effect that can be puzzling is that two consecutive "eofback console" stages will produce their output in reverse order. For such debugging cases and certain others, you may find that you prefer using this REXX filter: 0 +----------------------------------------------------------------------+ | | | /* FASTEOFB REXX: Like "eofback" but writes to device first */ | | | | /* This stage will run the device driver passed as an argument */ | | /* and will propagate end-of-file backwards. It differs from */ | | /* "eofback" in that it writes each record to the device driver */ | | /* *before* it copies the record to the output stream. Thus, */ | | /* the last record "peeked" but not successfully written to the */ | | /* output stream will have been written to the output "device", */ | | /* which is exactly what is wanted in some cases. */ | | | | Parse Arg stage /* Device driver and its arguments. */ | | | | 'CALLPIPE (listerr endchar ? name FastEOFBack)', | | '*.input.0: |', /* Input from caller. */ | | 'o: fanout stop anyeof |', /* Stop when secondary severed. */ | | stage, /* First copy to device driver. */ | | '?', | | 'o: |', /* Second copy to here. */ | | '*.output.0:' /* Output to caller. */ | | | | Exit RC*(RC<>12) /* RC = 0 if end-of-file. */ | | | +----------------------------------------------------------------------+ 0 You will notice the use of the "stop anyeof" option on the "fanout" here to propagate end-of-file backward from the secondary output. You should also note that using this stage to drive one of the REXX device drivers, 1 0 CMS Pipelines Initiation and Termination Page 35 ------------------------------------------------------------------------ 0 such as "var" or "stem", will result in the REXX variables being stored into or retrieved from the wrong REXX environment unless you up the REXX variable pool number by 1. 0 As a curiosity, I will mention that there is also an undocumented "noeofback" stage, which passes its input to its output without propagating end-of-file backwards. It is used under the covers in "totarget" and "fromtarget". 0 A Case of Stopping Too Soon: We need also to look at a case of fixing a + A Case of Stopping Too Soon + ___________________________ pipeline that is stopping too soon. One day last year, I received this note from Neale Ferguson: 0 +----------------------------------------------------------------------+ | | | I want to write a PIPE that sends a single record to a server via | | UDP and then receives multiple responses back and terminates when | | it receives a record like $END. How should I do this? | | | +----------------------------------------------------------------------+ 0 His problem was that he wanted to send one UDP message to a server and then get back some number of response lines terminated by a target record. It took us a while to figure out how to prevent the "udp" stage from terminating before all of the response lines had been received. The difficulty is that when "udp" is not the first stage in a pipeline, it terminates as soon as its input stream goes to end-of-file. (This is true even if one uses the "asynchronously" keyword.) The approach we came up with was to keep the input of "udp" connected until the final response line has been received: 0 +----------------------------------------------------------------------+ | | | 'PIPE', | | '| var datagram', /* For UDP Search port. */ | | '| f: fanin', /* Pass record, then wait. */ | | '| udp 0 asynchronously', /* Write to search port. */ | | '| totarget locate /$END/', /* Stay connected until end. */ | | '| stem response.', /* Store the response. */ | | '| hole', /* Don't let UDP see eof yet. */ | | '| f:' /* Secondary input of FANIN. */ | | | +----------------------------------------------------------------------+ 0 The pre-formatted datagram is loaded into the pipeline and written to the input of "fanin", which peeks the datagram record and writes it to "udp". "udp" transmits the datagram and consumes the input record, causing "fanin" also to consume it, at which point "var" terminates and "fanin" switches to its secondary input because it has seen end-of-file on its primary input. "udp" is perfectly happy to continue running, because its input stream is still connected (even though no more records are ever going to be fed through that input). So, "udp" continues 1 0 Page 36 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 running and begins receiving responses across the network and writing them to its output stream. The response lines are passed through "totarget" to "stem" and "hole", which consumes them. Note that "hole" does not write the records to the secondary input of "fanin", but its connection to "fanin" continues to exist as long as "hole" is still running, which means that "fanin" and, thus, "udp" will also continue running. 0 When "totarget" finally sees the $END record, it terminates and end-of-file begins propagating forward and backward in the pipeline. When "totarget" terminates, "stem" sees end-of-file on its input and terminates, causing "hole" to see end-of-file on its input and terminate, causing "fanin" to see end-of-file on its secondary input and terminate. When "totarget" terminates, "udp" potentially sees end-of-file on its output, but it may already have seen end-of-file on its input as a result of the termination of "fanin". Either way, "udp" terminates and the pipeline shuts down cleanly. So, we have kept "udp" from terminating before we wanted it to, by preventing end-of-file from propagating to its input stream until we wanted it to. 0 Advanced End-of-File Propagation + Advanced End-of-File Propagation + ________________________________ 0 Now, let's turn to some more advanced examples of end-of-file propagation, all of which delay actual termination until the pipeline has finished processing the current transaction(s). 0 Shutting down after the transaction completes: The BitServe pipeline on + Shutting down after the transaction completes + _____________________________________________ the next page is the central routine of a service machine. It has two input stages, "immcmd" and "starmsg". "starmsg" receives three kinds of messages: 0 1. CP information messages, which become the primary input of the "bitftp" stage and define the work it is to do, 2. SCIF (secondary console interface) messages, which become the secondary input of the "bitftp" stage, and 3. CP MSGes, which become the tertiary input of the "bitftp" stage. (The "domsg" stage responds to MSGes from end users and logs its actions by sending records to the "logger" stage; it passes the MSGes from a driver virtual machine to the "bitftp" stage.) 0 All three kinds of messages can arrive asynchronously, so each of "bitftp"'s input streams has an "elastic" stage to provide a bit of buffering. Several of the stages produce messages that are to be written to a log file. Those messages are gathered up by the "faninany" stage and written to the input of the "logger" stage. 0 The "immcmd CMD" stage accepts a number of subcommands, one of which is QUIT. One could certainly stop this pipeline by simply feeding the "quit" record to a "pipestop" stage. There is a problem with doing that, however, because the "bitftp" stage may be running a transaction that should be allowed to complete before the pipeline terminates. 1 0 CMS Pipelines Initiation and Termination Page 37 ------------------------------------------------------------------------ 0 "pipestop" would terminate the "starmsg" stage, which would mean that the "bitftp" stage could receive no more SCIF messages. It would, therefore, be unable to complete its transaction, which involves driving a process in another virtual machine. In fact, the "bitftp" stage would be unable to complete its transaction if "starmsg", "nfind", "find", or the second "elastic" stage were terminated, because they form the path through which it receives the SCIF messages. Furthermore, it would be unable to log its transaction if the "faninany" or "logger" stages were terminated too soon. However, by careful control of end-of-file propagation, one can pull the plug delicately and arrange things so that the timing of the shutdown of this pipeline is just right. 0 When the "doimmcmd" stage receives a QUIT subcommand from the "immcmd" stage, it first writes a message to the "logger" stage (via the "faninany" stage), then sends a record to the "gate" stage, and finally terminates. Its termination causes the "immcmd" stage to terminate, because, like most input device drivers, "immcmd" terminates when its output stream has been severed. When "gate" receives the record from "doimmcmd" on its primary input stream, it also terminates, which severs its other input stream and its output stream. The severing of its output stream causes the "elastic" stage that had been connected to that stream also to terminate, severing its connection to the primary input of the "bitftp" stage. The fact that "gate"'s secondary input stream has been severed does not affect the "find" stage, even though its primary output was connected to that stream; "find" still has its secondary output connected, so it continues to run. Similarly, "faninany" continues to run despite the severing of its primary input stream when the "doimmcmd" stage terminated. So, the portion of the pipeline that is not needed for completing the current transaction is shut down, and the part that is needed is left running, unaware that the seeds of its destruction have been sowed. 1 0 Page 38 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 +----------------------------------------------------------------------+ | | | 'PIPE (listerr endchar ? name BitServe)', | | 'immcmd CMD|', /* Capture immediate commands. */ | | 'd: rexx doimmcmd |', /* Analyze and process them. */ | | 'f: faninany |', /* Gather records for log file. */ | | 'rexx logger', /* Manage the log files. */ | | '?', | | 'd: |', /* DOIMMCMD's QUIT signal. */ | | 'g: gate strict', /* Terminate when get signal. */ | | '?', | | 'starmsg |', /* Connect to CP *MSG service. */ | | 'u: nfind 00000001|', /* Divert if user/driver MSGes. */ | | 's: find 00000007|', /* Divert SCIF; keep IMSGes. */ | | 'rexx doimsg |', /* Select interesting IMSGes. */ | | 'g: |', /* Run through gateway. */ | | 'elastic |', /* Buffer a few IMSG records. */ | | 'b: rexx bitftp |', /* Run the FTP requests. */ | | 'f:', /* Write them to the log file. */ | | '?', | | 's: |', /* SCIF messages to here. */ | | 'elastic |', /* Buffering needed here, too. */ | | 'b:', /* To BITFTP's secondary input. */ | | '?', | | 'u: |', /* User/driver MSGes to here. */ | | 'm: rexx domsg |', /* Handle some; output others. */ | | 'elastic |', /* Buffering needed here, too. */ | | 'b:' /* To BITFTP's tertiary input. */ | | '?', | | 'm: |', /* Processed MSGes to here. */ | | 'f:' /* Write to the log file. */ | | | |----------------------------------------------------------------------| + + + | | | +------+ +-----+ +---+ +---+ | | |immcmd|--|doimm|--------------------------------------| |--| l | | | +------+ | cmd |----------+ | | | o | | | +-----+ | +----+ +-------+ | | | g | | | +----+ +-----+ +----+ +-|gate| |elastic| +---+ | f | | g | | | |star|--|nfind|----|find|----| |--|(IMSGs)|--| |--| a | | e | | | |msg | | MSG |-+ |IMSG|-+ +----+ +-------+ | b | | n | | r | | | +----+ +-----+ | +----+ | +-------+ | i | | i | +---+ | | | +----------|elastic|--| t | | n | | | | |(SCIFs)| | f | | a | | | | +-------+ | t | | n | | | | +-----+ +-------+ | p | | y | | | | | |------------|elastic|--| | | | | | +-|domsg| | (MSGs)| | | | | | | | | +-------+ +---+ | | | | | |------------------------------| | | | +-----+ +---+ | | | +----------------------------------------------------------------------+ 1 0 CMS Pipelines Initiation and Termination Page 39 ------------------------------------------------------------------------ 0 The remaining stages continue to run while "bitftp" finishes its transaction. It then reads from its primary input stream to get an IMSG describing its next transaction. When it discovers that its primary input has been severed, it terminates, causing end-of-file to propagate backward through the second and third "elastic" stages. The "domsg" stage propagates end-of-file backwards from either of its output streams, so it terminates now, causing end-of-file to propagate backwards to "find", "nfind", and "starmsg". The termination of "domsg" also causes end-of-file to propagate forward to "faninany" and to "logger", which by now has logged the final messages from all of the other stages. The pipeline has now shut down as the result of "cmd quit" being typed on the virtual machine console seconds or even hours earlier. 0 When this pipeline was first written in the early '90s, shutting it down with the correct timing was much more difficult, because backward propagation of end-of-file was much less powerful than it is today. Back then, there was no way for the termination of the "bitftp" stage to cause end-of-file to propagate all the way back to the "starmsg" stage. The best idea I could come up with was to have "bitftp" issue a PIPMOD STOP command just before it terminated. That terminated "starmsg" and started a cascade of forward propagation of end-of-file after "starmsg"'s output stream was severed. 0 After the 1996 enhancements for end-of-file propagation, that was all changed. The built-ins used in this pipeline now either are summarily stopable or use "EOFREPORT" to notice that their output streams have been severed while they are waiting for input; so, they no longer need to wait until they get another input record and try to write it to the severed output stream before realizing that it is time to quit. And it was easy to enhance DOMSG REXX to use "EOFREPORT ANY", so that it could propagate end-of-file backward from either output stream. Now, all that the "bitftp" stage has to do to start the remaining dominoes falling is to return to the dispatcher. 0 Multiple Segments Passing Through One GATE: TCPSHELL EXEC (see my paper + Multiple Segments Passing Through One GATE + __________________________________________ Plumbing the Internet) builds and executes a multi-tasking pipeline that + _____________________ runs a variable number of TCP/IP processes. The TCP/IP processes are dispatched by being passed a client connection request record from a "deal secondary" stage, and they request work to do by feeding a record into a "faninany | elastic " sequence that feeds their request records to the secondary input of that "deal secondary". 0 The pipeline built by TCPSHELL EXEC supports half a dozen immediate commands useful in a server. Of interest here are two of those immediate commands, STOP, which stops the entire server as quickly as possible, and QUIT, which tells the server to shut down after all of the processes have completed their current transaction. 0 The STOP immediate command stops the server quickly by feeding a record into a "pipestop" stage, thus posting the ECBs for all stages that are waiting for external events. That includes all of the other "immcmd" 1 0 Page 40 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 stages (as well as all of the "tcpxxxxx" stages). When one uses the STOP immediate command, the pipeline collapses, aborting all active client sessions. 0 The QUIT immediate command, on the other hand, feeds a record into the primary input of a "gate" stage, causing "gate" to terminate and sever all of its streams. That begins the orderly shutdown of the entire pipeline, as end-of-file starts flowing forward and backward through the pipeline: 0 o All of the other "immcmd" stages have this "gate" on their output side, so they all terminate when the gate closes. ("immcmd" terminates prematurely when its output stream is severed.) 0 o Because "deal secondary" has a stream of the "gate" on its input side, it terminates without dispatching another process after the gate is closed. 1 0 CMS Pipelines Initiation and Termination Page 41 ------------------------------------------------------------------------ 0 +----------------------------------------------------------------------+ | | | 'PIPE (endchar ? listerr name TCPShell)', /* Run the server: */ | | 'immcmd QUIT |', /* QUIT was typed on console. */ | | 'g: gate', /* Don't dispatch any more. */ | | '?', | | 'immcmd STOP |', /* STOP was typed on console. */ | | 'g: |', /* (GATE closed by QUIT cmd.) */ | | 'pipestop', /* Pull all the plugs now. */ | | '?', | | 'immcmd CP |', /* Immediate CP command. */ | | 'g: |', /* (GATE closed by QUIT cmd.) */ | | 'cp', /* Pass command to CP. */ | | '?', | | 'immcmd CMS |', /* Immediate CMS command. */ | | 'g: |', /* (GATE closed by QUIT cmd.) */ | | 'xlate upper |', /* Upper-case the command. */ | | 'command', /* Pass command to CMS. */ | | '?', | | 'immcmd DUMP |', /* DUMP was typed on console. */ | | 'g: |', /* (GATE closed by QUIT cmd.) */ | | 'pipdump |', /* Format a dump. */ | | '>> pipdump listing d', /* Write it to disk. */ | | '?', | | 'immcmd JEREMY |', /* JEREMY was typed on console. */ | | 'g: |', /* (GATE closed by QUIT cmd.) */ | | 'jeremy |', /* Format pipeline status. */ | | 'console', /* Display status. */ | | '?', | | 'l: tcplisten' arg0.server_port, /* Wait for next client. */ | | 'BACKLOG' backlog, | | 'REUSEADDR |', /* Share port with other VMs. */ | | 'g: |', /* (GATE closed by QUIT cmd.) */ | | 'd: deal secondary', /* Clients to ready processes. */ | | '?', | | 'y: faninany |', /* Stream of streamnumber recs. */ | | 'elastic |', /* Hold until ready for them. */ | | 'd: | process1 | y:', /* Give work to process 1. */ | | '?' | | 'd: | process2 | y:', /* Give work to process 2. */ | | '?', | | . . . | | '?', | | 'd: | processN | y:', /* Give work to process N. */ | | '?', | | 'l: |', /* Any TCP/IP ERRNO to here. */ | | 'g: |', /* (GATE closed by QUIT cmd.) */ | | 'nfind 5001|', /* 5001 is not really an error. */ | | 'spec /'self '(tcplisten)/ 1 1-* nw |', /* Identify ERRNO. */ | | 'console' /* Display the ERRNO, if any. */ | | | +----------------------------------------------------------------------+ 1 0 Page 42 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 o Since the "tcplisten" stage has a stream of the "gate" on its output side, it, too, terminates as the result of a QUIT immediate command; no more client connection requests are accepted from the network after the gate is closed.(16) 0 o The termination of "deal secondary" severs the output stream of "elastic", which is connected to the secondary input of "deal secondary", so "elastic" terminates, causing "faninany" to terminate, since it no longer has an output stream connected. 0 o As each server process finishes its current client transaction, it tries to write a record to "faninany" and discovers that its output stream has been severed. That causes it to terminate. 0 Once the last of the server processes finishes its current transaction and terminates, there are no longer any stages running in the TCPShell pipeline, so the PIPE command itself completes. 0 This orderly shutdown process works well, but making it robust was a fiddly business. "gate", like any other stage, becomes blocked any time it tries to write an output record. Thus, there is always the potential that the "gate" stage will become blocked for an extended period when an output record on one of its streams is not consumed "quickly". If that happens, no records will be able to move through the other pipeline segments that run through the "gate" until that output record is consumed. If, for some reason, that output record is never consumed, the pipeline will hang permanently, and a record arriving on "gate"'s primary input will not be noticed. The pipeline will not stall, because a pipeline that has stages waiting on external events has to assume that things may change if a record is received from one of the external interfaces. The only way to make sure that such hangs are not a problem with this server is to make sure that each of the paths through "gate" promptly consumes the records it reads from "gate". 0 Let's look at a more concise example of this problem, which uses "delay" to cause a long blockage of a "gate" stage: 0 0 0 0 -------------------- 0 (16) Note that the primary and secondary output of "tcplisten" are both run through the "gate". This was necessary under older levels of CMS Pipelines because originally "tcplisten" did not terminate + ______________ prematurely when only its primary output stream was severed. 1 0 CMS Pipelines Initiation and Termination Page 43 ------------------------------------------------------------------------ 0 +----------------------------------------------------------------------+ | | | 'PIPE (listrc endchar ?)', | | 'immcmd QUIT | g: gate', | | '?', | | 'immcmd ABCD | g: | insert /*/ | console', | | '?', | | 'literal +30 | g: | delay', | | '?', | | 'immcmd WXYZ | g: | insert /*/ | console' | | | +----------------------------------------------------------------------+ 0 If one types "abcd" immediately after starting this pipeline, the asterisk is not displayed for 30 seconds. If one types "quit" immediately after starting this pipeline, none of the stages terminate until after the 30-second delay has expired. In either case, the "gate" stage is blocked writing the "+30" record to the "delay" stage, which has peeked it but will not consume it until the 30 seconds have passed. 0 There is an easier, surer way to spread end-of-file to multiple pipeline segments: 0 +----------------------------------------------------------------------+ | | | 'PIPE (listrc endchar ?)', | | 'immcmd QUIT | o: fanout', | | '?', | | 'o: | take 1 | copy | g1: gate', | | '?', | | 'immcmd ABCD | g1: | insert /*/ | cons', | | '?', | | 'o: | take 1 | copy | g2: gate', | | '?', | | 'literal +30 | g2: | delay', | | '?', | | 'o: | take 1 | copy | g3: gate', | | '?', | | 'immcmd WXYZ | g3: | insert /*/ | cons' | | | +----------------------------------------------------------------------+ 0 In this pipeline, there is a separate "gate" stage for each segment of the pipeline that needs to be terminated. (In other words, none of the "gate" stages has more than two input streams.) When the time comes to pull the plugs, the trigger record is fed into a "fanout" that makes multiple copies, one for each of the "gate" stages. Each copy of the trigger record goes first to a " take 1 | copy" sequence that consumes it quickly, preventing "fanout" from being blocked and allowing "immcmd" and "fanout" to terminate. 1 0 Page 44 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 Since each "gate" stage now normally receives records only on its secondary input, the time never comes when it can't read a record from, say, its tertiary input because it is blocked waiting for a record to be consumed on its quaternary output. Records can pass through all three "gate"s independently of one another. If one types "abcd" immediately after starting this pipeline, the asterisk is displayed instantly. (If one types "quit" immediately after starting this pipeline, all but four of the stages terminate instantly; the others terminate when the 30-second delay is over.) 0 Doing It Without PIPESTOP or GATE: Over the years, I've become + Doing It Without PIPESTOP or GATE + ____________________________________ convinced that if I have to use "pipestop" in a pipeline, I haven't thought it through sufficiently, and that the same is probably true if I have to use "gate". This next example, a pipeline called SMTPLite, is a case in point. I struggled for years to get it to terminate cleanly no matter what errors it got from the network. At first, I used a "gate" stage to shut it down when any of several error conditions occurred. Later, I split those up and used two "gate" stages, but even then it would sometimes hang when the network was being particularly perverse. When I finally really thought it through, it became clear that all I needed to do was use straightforward end-of-file propagation to shut the pipeline down. The final version of the pipeline is both simpler and more reliable than before. 0 SMTPLite is an exercise in both record flow and end-of-file propagation, so it may require a bit of study. It is one process in a multi-processing pipeline similar to the one in our previous example. Its function is to transmit mail files to an SMTP server using the SMTP protocol. It has an input stream through which the calling pipeline gives it work to do (a record containing the spoolid of the next reader file to be read and sent to the SMTP server). It has an output stream through which it requests work to do; each time it is ready for more work, it writes a record containing its process number to that output stream. It handles mail files in a continuous flow, which is more efficient than would be sipping a single mail file, terminating when that was done, and then starting afresh on the next mail file. Any time this subroutine pipeline does decide to terminate, it first sends an SMTP QUIT command across the network to the remote SMTP server and waits for the connection to close, as is required by the RFCs covering SMTP. In the unusual case that the calling pipeline wants the SMTPLite subroutine to terminate (because of an immediate command), it sends it a signal record through its secondary input stream. 0 The RFCs describe an SMTP transaction in terms of state diagrams, which I found a bit daunting at first. In the end, it was not difficult to "pipethink" the process. Carefully routing records through different segments of a pipeline has much the same effect as making the state transitions described in the RFC. An input is not consumed until the server has replied with a response indicating successful or unsuccessful processing of that input. When that response is consumed, a cascade of record consumption travels backwards until the original input is also consumed, allowing the next input to be processed through a different pipeline segment, in effect changing the state of the pipeline. 1 0 CMS Pipelines Initiation and Termination Page 45 ------------------------------------------------------------------------ 0 The spool files processed by this pipeline are mail files in BSMTP ("Batch SMTP") format. A BSMTP-format file contains a mail file wrapped in the SMTP commands needed to transmit it to an SMTP server. The "getbsmtp" stage invoked in the SMTPLite pipeline receives the specified spool file from the reader and verifies that it is well formed. To achieve the transition between the two main states of sending commands or sending mail, the "getbsmtp" stage first forms its input into two records, one containing all of the commands from the BSMTP envelope and the other containing all of the lines of the mail text. It will write neither of these records until both exist. Once it has both, it sends the first (the commands) through the appropriate pipeline path and then holds the second until the first has been consumed. The first record is consumed when the pipeline should make a state transition; this causes the second record (the mail data) to be sent along a different path through the pipeline. The second record is consumed when the pipeline should make the transition back to the initial state. 0 In the normal case, the dialogue between the SMTP server (S) and the client (C) is as follows: 0 S> 220 babybear.Princeton.EDU running IBM VM SMTP V2R2 on . . . C> HELO pucc.Princeton.EDU S> 250 babybear.Princeton.EDU is my domain name. C> MAIL FROM:S> 250 OK C> RCPT TO: S> 250 OK C> DATA S> 354 Enter mail body. End by new line with just a '.' C> Date: Wed, 06 Jan 93 17:27:54 EST From: Melinda Varian . . . S> 250 Mail Delivered . . . (Iterations of sequence between MAIL command and "250 Mail Delivered" response) . . . C> QUIT S> 221 babybear.Princeton.EDU closing connection 0 When all of the commands from an envelope have been executed successfully, the server sends a 354 response (the prompt to begin sending the mail text). This 354 record is consumed, causing a cascade of record consumption to travel backwards in the pipeline until the single record containing all of the envelope commands is also consumed. Once that record has been consumed, the "getbsmtp" stage is free to produce the single record containing the entire mail text. 0 When all the lines of the mail text have been sent, the server sends a 250 response. This 250 record is consumed, causing a cascade of record consumption to travel backwards in the pipeline until the single record containing the entire mail text has been consumed. When that record has been consumed, the pipeline consumes the original spoolid record and reverts to the state in which it can begin processing another BSMTP envelope. 1 0 Page 46 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 There are a number of error cases, but they are treated much the same, because the RFC requires that the client send an explicit SMTP QUIT command before breaking the network connection for any reason. The errors that are detected include: 0 1. "getbsmtp" found badly-formed BSMTP. 2. The SMTP server issued a message indicating a transient error ("4yz"). 3. The SMTP server issued a message indicating a permanent error ("5yz"). 4. A STOP immediate command was typed on the virtual machine console. 0 When an error case is detected, the record indicating the failure or error state is converted into an SMTP QUIT command that flows back to the SMTP server to close the connection. When the server sends the connection closing message (221), the pipeline terminates due to end-of-file propagation. That allows REXX code in the filter to deal with the error, after which it starts the pipeline up again in the initial state. (TCP/IP errors must also be dealt with, but in the case of an error detected by TCP/IP the network connection has already been broken, so the pipeline simply shuts down.) 0 As the SMTPLite pipeline (on the next page) sends mail files to the SMTP server, it switches continuously between the states in the state diagram. However, it must be ready to terminate promptly, whatever its state, whenever it detects an error case. It achieves this by careful propagation of end-of-file. 0 If the "getbsmtp" stage decides that an input file is badly formed, it writes a record to its quaternary output, which feeds into the "a: faninany" stage. Transient and permanent error responses from the SMTP server are indicated by messages beginning with "4" or "5", which are detected by the two "nfind" stages; the "nfind" stages feed the error messages to the same "faninany" stage. These three situations are discovered in this pipeline or its subroutines, so the records representing the errors arrive at the "a: faninany" stage synchronously. If a STOP command is entered on the console, the calling pipeline detects it and sends a copy to each of the SMTPLite pipelines. That STOP record may come in through the secondary input from the calling pipeline at any time. It is also fed into the "faninany"; thus, all four of the error cases are handled by the same pipeline segment. 0 The "faninany" is followed by a "take 1" because we don't want to send two QUIT commands to the SMTP server, no matter how many ending conditions we encounter. "copy" consumes the error record and unblocks the cascade of stages that produced it, whichever one that may be. "fasteofb var" saves an error description for later analysis. "spec" converts the error record to an SMTP QUIT command that is then fed across the network to the server via the "callsmtp" stage. When the server returns the 221 message (connection closed), the "tolabel" is satisfied and the pipeline shuts down cleanly. 1 0 CMS Pipelines Initiation and Termination Page 47 ------------------------------------------------------------------------ 0 The other condition in which the connection may be closed (which does not result in a 221 message) is when TCP/IP issues an ERRNO because of a network error. In that case, "callsmtp" writes the ERRNO record to its secondary output and then terminates, which causes the SMTPLite subroutine pipeline to terminate, as end-of-file propagates forward and backwards from the "callsmtp" stage. 1 0 Page 48 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 +----------------------------------------------------------------------+ | | | 'CALLPIPE (endchar ? listerr name SMTPLite'seqno')', | | '*.input.0: |', /* Get the next spool file id. */ | | 'copy |', /* Unblock caller's DEAL. */ | | 'fasteofb var spoolid 1 tracking |',/* Remember which file.*/ | | 'g: getbsmtp' seqno '|', /* Get file's BSMTP commands. */ | | 'split after string x0D25 |', /* One command per record. */ | | 'y: faninany |', /* Feed inputs to CALLSMTP. */ | | 'c: callsmtp' seqno '|', /* Communicate w/SMTP server. */ | | 'tolabel 221|', /* Terminate when get closing. */ | | 'n: nfind 4|', /* Divert SMTP transient error.*/ | | 'N: nfind 5|', /* Divert SMTP permanent error.*/ | | 'between /354/ /250/ |', /* Before and after mail sent. */ | | 'b: find 354|', /* SMTP is ready to read mail. */ | | 'copy |', /* Unblock GETBSMTP's primary. */ | | 'hole', /* Keep COPY running. */ | | '?', | | 'b: |', /* Here while mail being sent. */ | | 'find 250|', /* SMTP has delivered the mail.*/ | | 'copy |', /* Unblock GETBSMTP's 2ndry. */ | | 'hole', /* Keep COPY running. */ | | '?', | | 'g: |', /* Mail msg data from GETBSMTP.*/ | | 'y:', /* Read it into CALLSMTP. */ | | '?', | | 'g: |', /* Null record from GETBSMTP...*/ | | , /* ... when spool file purged. */ | | 'fasteofb var spoolid 1 tracking |', /* Remember file gone.*/ | | 'insert /'seqno'/ |', /* Record with my streamnumber.*/ | | '*.output.0:', /* Ask for another spool file. */ | | '?', | | 'g: |', /* GETBSMTP couldn't parse file*/ | | 'a: faninany |', /* Any requests to close link. */ | | 'take 1 |', /* Don't send multiple QUITs. */ | | 'copy |', /* Unblock producer of error. */ | | 'spec word 1 1 |', /* Isolate termination reason. */ | | 'fasteofb var smtperr 1 tracking |', /* Store for analysis.*/ | | 'spec /QUIT'crlf'/ 1 |', /* Build SMTP QUIT command. */ | | 'y:', /* Close the SMTP connection. */ | | '?', | | '*.input.1: |', /* STOP was typed on console. */ | | 'a:', /* Remember error; close link. */ | | '?', | | 'n: | a:', /* Remember transient SMTP error; close link. */ | | '?', | | 'N: | a:', /* Remember permanent SMTP error; close link. */ | | '?', | | 'c: |', /* Here if TCP errno/timeout. */ | | 'append literal |', /* In case no TCP/IP error. */ | | 'var errno' /* Save TCP/IP errno, if any. */ | | | +----------------------------------------------------------------------+ 1 0 CMS Pipelines Initiation and Termination Page 49 ------------------------------------------------------------------------ 0 To get this all to work with the correct timing, I found that I had to use "fasteofb", rather than "eofback", because the "var" stages need to set their REXX variables as soon as they read each input record, not after they have copied their input to their output and have waited for it to be consumed. 0 This pipeline shuts down easily only because every stage in it propagates end-of-file backward as quickly as possible.(17) Once one 0 -------------------- 0 (17) Some comments on record flow in this pipeline: 0 SMTPLite's input record contains the spool file id of the next file to be processed. The spoolid is stored in the REXX variable "spoolid" for possible error analysis if the transaction should fail, and the spoolid record is peeked by "getbsmtp", which reads the specified spool file, verifies that it is well-formed BSMTP, and writes it reformatted as two records, one (on its primary output) containing all the lines of the BSMTP envelope, the other (on its secondary output) containing the mail text. 0 The SMTPLite pipeline reads the envelope from "getbsmtp"'s primary output and splits it into individual SMTP commands to feed into the "callsmtp" stage to be sent across the network to the SMTP server. ("callsmtp" is basically a "tcpclient" stage packaged with the appropriate translation and deblocking stages.) "callsmtp" writes the response messages from the server to its output stream for analysis by this pipeline. 0 The normal sequence of responses from SMTP will lead finally to a 354 prompt (authorizing the mail text to be transmitted). When the 354 record is received, it is consumed by a "copy" stage, causing the record on the primary output of "getbsmtp" to be consumed, which allows "getbsmtp" to write the mail text record to its secondary output, which is also routed into "callsmtp" (as a single record). The normal response to this is a 250 message (indicating that the mail has been delivered). When the 250 record is received, it is consumed by another "copy" stage, and that allows the subroutine pipeline in "getbsmtp" to terminate. 0 "getbsmtp" purges the spool file and writes a null record to its tertiary output. That record is fed to "fasteofb var spoolid 1 tracking", to clear the REXX variable "spoolid" to remember that there is no active spool file, should there be a subsequent error. The null record is then transformed into a record containing the process number and fed into the calling pipeline to request another spool file to work on. 0 As in the previous example, an individual process (such as this one) is dispatched by a "deal secondary" stage in the calling pipeline. Thus, we need the "copy" at the beginning here to consume "deal"'s output quickly so that "deal" can continue 1 0 Page 50 CMS Pipelines Initiation and Termination ------------------------------------------------------------------------ 0 has a thorough understanding of end-of-file propagation, the whole matter of shutting complex pipelines down gracefully becomes much simpler. 0 0 0 0 0 0 0 0 0 0 0 0 0 -------------------- 0 dispatching other processes. It then becomes important not to consume the output of that first "copy" stage too soon and thereby let another transaction potentially leak in and contaminate the current one. For that reason, "getbsmtp" does not consume the output of that "copy" until it has successfully written the null record to its tertiary output to indicate that the transaction is complete. Only when the record on its tertiary output has been consumed by the calling pipeline does "getbsmtp" finally consume the output of the first "copy", freeing "copy" to accept another spoolid record (defining another mail file) from "deal secondary". 1 0 Appendix: Using EOFREPORT ANY Page 51 ------------------------------------------------------------------------ 0 Appendix A + Appendix A 0 USING EOFREPORT ANY + USING EOFREPORT ANY + ___________________ 0 This posting by Steve Hayes of IBM is a sophisticated use of the "EOFREPORT ANY" command that is well worth studying: 0 ----- PIPELINE CFORUM appended at 10:30:08 on 97/04/23 GMT (by TSGSH at GFORD1) Subject: not quite SYNCH Ref: Append at 20:13:02 on 97/04/22 GMT (by GSKNICK at PKMFGVM3) 0 I'd make it an option on "synchronise"--"synchronise loosely"--a suggestion I've never remembered to append before even though I've meant to on many occasions. "synchronise loosely" would ensure that inputs are read in order and outputs are written in order but the two series of actions for each set of records may over-lap. 0 Its order of preference is: 0 1. Output the lowest numbered input record which has been peeked 0 2. Consume the lowest numbered input record which has been output 0 3. Peek the lowest numbered input record which has not been peeked 0 It will attempt to avoid stalls within this by: 0 1. Testing to see which streams are waiting for input and output before applying the above rule 0 2. If no streams are waiting, suspending itself and retrying 0 3. If no streams are waiting and "SUSPEND" gives RC 0, using the above rule without testing the state of the stream. 0 It would differ from the current behaviour in that it cannot know if a full set of input records will be received and so it may output a partial set in more circumstances than ordinary "synchronise" does. "EOFREPORT" should minimise the effect of this. 0 A REXX implementation is given below. 0 Steve Hayes IBM Global Services: Network Services EMEA SNA I&S, Greenock, UK TSGSH at GFORD1 / GBIBMT9D at IBMMAIL / tsgsh@vnet.ibm.com 0 /*--------------------------------------------------------------------*/ /* SYNCL REXX: SYNCHRONISE LOOSELY */ /* Steve Hayes (TSGSH at GFORD1, tsgsh@vnet.ibm.com) 1997-04-23 */ 1 0 Page 52 Appendix: Using EOFREPORT ANY ------------------------------------------------------------------------ 0 /*--------------------------------------------------------------------*/ trace o /* in case of stall */ signal on novalue /* No uninitialised variables*/ signal on failure /* Allow RC > 0 for a moment */ 'MAXSTREAM INPUT' /* Check only one stream */ maxstream = RC /* record max stream number */ call on error name error12 /* We should get RC 4 (or 8) */ do i = 0 to maxstream /* check that all streams */ 'SELECT BOTH' i /* are initially connected */ 'STREAMSTATE INPUT' /* and End if any of them is */ 'STREAMSTATE OUTPUT' /* not ready */ end /* */ signal on error /* end on EoF */ 'EOFREPORT ANY' /* propagate EoF back */ 'COMMIT 0' /* ready to roll */ do forever /* until EOF */ nextpeek = 0 /* first unpeeked record */ nextread = 0 /* first unconsumed record */ nextoutput = 0 /* first unwritten record */ do n = 0 while nextpeek \> maxstream,/* while any record has */ | nextread \> maxstream,/* not been peeked, written */ | nextoutput \> maxstream /* and consumed */ if nextoutput < nextpeek /* First choice: output the */ then do /* next peeked record */ 'SELECT OUTPUT' nextoutput /* Select the output stream */ call on error name error12 /* We may get RC 8 (or 4!) */ 'STREAMSTATE OUTPUT' /* test it */ if RC = 0 /* It is waiting for output */ then do /* so write to it */ signal on error /* back to normal EoF test */ 'OUTPUT' record.nextoutput /* write peeked record */ nextoutput = nextoutput + 1 /* increment count */ drop record.nextoutput /* in case it is very long */ iterate n /* next lap round inner loop */ end /* output */ end /* end of first choice */ if nextread < nextoutput /* Second choice: consume */ then do /* next peeked record */ 'SELECT INPUT' nextread /* Select the input stream */ call on error name error12 /* We may get RC 8 (or 4!) */ 'STREAMSTATE INPUT' /* test it */ if RC = 0 /* It is waiting for output */ then do /* the next written record */ signal on error /* back to normal EoF test */ 'SELECT INPUT' nextread /* Select the input stream */ 'READTO' /* consume written record */ nextread = nextread + 1 /* increment count */ iterate n /* next lap round inner loop */ end /* readto */ end /* end of second choice */ if nextpeek \> maxstream /* Third choice: peek the */ then do /* next peeked record */ 'SELECT INPUT' nextpeek /* Select the input stream */ 1 0 Appendix: Using EOFREPORT ANY Page 53 ------------------------------------------------------------------------ 0 call on error name error12 /* We may get RC 8 (or 4!) */ 'STREAMSTATE INPUT' /* test it */ if RC = 0 /* It is waiting for output */ then do /* next un-peeked record */ signal on error /* back to normal EoF test */ 'SELECT INPUT' nextpeek /* Select the input stream */ 'PEEKTO record.'nextpeek /* consume written record */ nextpeek = nextpeek + 1 /* increment count */ iterate n /* next lap round inner loop */ end /* peekto */ end /* end of third choice */ signal off error /* nothing to do just now */ 'SUSPEND' /* let something else run */ if RC > 0 then iterate n /* try again */ /*----------------------------------------------------------------*/ /* OK, no more Mr. Nice Guy! */ /* We've tested the state of every allowed stream and none of */ /* them are waiting and there are no other stages that want to */ /* run: just go for it */ /*----------------------------------------------------------------*/ signal on error /* end on EoF */ if nextoutput < nextpeek /* First choice: output the */ then do /* next peeked record */ 'SELECT OUTPUT' nextoutput /* Select the output stream */ 'OUTPUT' record.nextoutput /* write peeked record */ nextoutput = nextoutput + 1 /* increment count */ drop record.nextoutput /* in case it is very long */ iterate n /* next lap round inner loop */ end /* end of first choice */ if nextread < nextoutput /* Second choice: consume */ then do /* next peeked record */ 'SELECT INPUT' nextread /* Select the output stream */ 'SELECT INPUT' nextread /* Select the input stream */ 'READTO' /* consume written record */ nextread = nextread + 1 /* increment count */ iterate n /* next lap round inner loop */ end /* end of second choice */ if nextpeek \> maxstream /* Third choice: peek the */ then do /* next peeked record */ 'SELECT INPUT' nextpeek /* Select the output stream */ 'SELECT INPUT' nextpeek /* Select the input stream */ 'PEEKTO record.'nextpeek /* consume written record */ nextpeek = nextpeek + 1 /* increment count */ iterate n /* next lap round inner loop */ end /* end of third choice */ end n /* next action for this set */ end /* next set of records */ failure: error: exit (RC * (RC \= 12 & RC \= 8 & RC \= 4)) /* RC = 0 if EOF */ error12: if RC = 12 then signal error 1 0 Page 54 Appendix: Using EOFREPORT ANY ------------------------------------------------------------------------ 0 else return