source: LMDZ6/trunk/libf/phylmd/ecrad.v1.5.1/ecsort_shared.h @ 5451

Last change on this file since 5451 was 3908, checked in by idelkadi, 4 years ago

Online implementation of the radiative transfer code ECRAD in the LMDZ model.

  • Inclusion of the ecrad directory containing the sources of the ECRAD code
    • interface routine : radiation_scheme.F90
  • Adaptation of compilation scripts :
    • compilation under CPP key CPP_ECRAD
    • compilation with option "-rad ecard" or "-ecard true"
    • The "-rad old/rtm/ecran" build option will need to replace the "-rrtm true" and "-ecrad true" options in the future.
  • Runing LMDZ simulations with ecrad, you need :
    • logical key iflag_rrtm = 2 in physiq.def
    • namelist_ecrad (DefLists?)
    • the directory "data" containing the configuration files is temporarily placed in ../libfphylmd/ecrad/
  • Compilation and execution are tested in the 1D case. The repository under svn would allow to continue the implementation work: tests, verification of the results, ...
File size: 21.9 KB
Line 
1/**
2 * (C) Copyright 2014- ECMWF.
3 *
4 * This software is licensed under the terms of the Apache Licence Version 2.0
5 * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6 *
7 * In applying this licence, ECMWF does not waive the privileges and immunities
8 * granted to it by virtue of its status as an intergovernmental organisation
9 * nor does it submit to any jurisdiction.
10 */
11
12!*** ecsort_shared.h ***
13
14#if INT_VERSION == 4
15
16#define DATA_TYPE            INTEGER(KIND=JPIM)
17#define SIZEOF_ME            sizeof_int4
18#define KEYSORT_1D           INT4_KEYSORT_1D
19#define KEYSORT_1D_DRHOOKSTR 'ECSORT_MIX:INT4_KEYSORT_1D'
20#define KEYSORT_2D           INT4_KEYSORT_2D
21#define KEYSORT_2D_DRHOOKSTR 'ECSORT_MIX:INT4_KEYSORT_2D'
22#define KEYSORT_NUMBER       11
23#define RSORT_DRHOOKSTR      'ECSORT_MIX:RSORT32_FUNC_11'
24#define USE_RSORT64          0
25#define HEAPSORT             INT4_HEAPSORT
26#define HEAPSORT_DRHOOKSTR   'ECSORT_MIX:INT4_HEAPSORT'
27#define DBGPRINT             INT4_DBGPRINT
28#define DBGFMTNUM            1011
29#define ECQSORT_DRHOOKSTR    'ECSORT_MIX:INT4_ECQSORT'
30#define COUNT_DRHOOKSTR      'ECSORT_MIX:INT4_COUNT'
31#define GNOME_DRHOOKSTR      'ECSORT_MIX:INT4_GNOME'
32
33#elif REAL_VERSION == 8
34
35#define DATA_TYPE            REAL(KIND=JPRD)
36#define SIZEOF_ME            sizeof_real8
37#define KEYSORT_1D           REAL8_KEYSORT_1D
38#define KEYSORT_1D_DRHOOKSTR 'ECSORT_MIX:REAL8_KEYSORT_1D'
39#define KEYSORT_2D           REAL8_KEYSORT_2D
40#define KEYSORT_2D_DRHOOKSTR 'ECSORT_MIX:REAL8_KEYSORT_2D'
41#define KEYSORT_NUMBER       12
42#define RSORT_DRHOOKSTR      'ECSORT_MIX:RSORT64_12'
43#define USE_RSORT64          1
44#define HEAPSORT             REAL8_HEAPSORT
45#define HEAPSORT_DRHOOKSTR   'ECSORT_MIX:REAL8_HEAPSORT'
46#define DBGPRINT             REAL8_DBGPRINT
47#define DBGFMTNUM            1012
48#define ECQSORT_DRHOOKSTR    'ECSORT_MIX:REAL8_ECQSORT'
49#define COUNT_DRHOOKSTR      'ECSORT_MIX:REAL8_COUNT'
50#define GNOME_DRHOOKSTR      'ECSORT_MIX:REAL8_GNOME'
51
52#elif REAL_VERSION == 4
53
54#define DATA_TYPE            REAL(KIND=JPRM)
55#define SIZEOF_ME            sizeof_real4
56#define KEYSORT_1D           REAL4_KEYSORT_1D
57#define KEYSORT_1D_DRHOOKSTR 'ECSORT_MIX:REAL4_KEYSORT_1D'
58#define KEYSORT_2D           REAL4_KEYSORT_2D
59#define KEYSORT_2D_DRHOOKSTR 'ECSORT_MIX:REAL4_KEYSORT_2D'
60#define KEYSORT_NUMBER       13
61#define RSORT_DRHOOKSTR      'ECSORT_MIX:RSORT32_FUNC_13'
62#define USE_RSORT64          0
63#define HEAPSORT             REAL4_HEAPSORT
64#define HEAPSORT_DRHOOKSTR   'ECSORT_MIX:REAL4_HEAPSORT'
65#define DBGPRINT             REAL4_DBGPRINT
66#define DBGFMTNUM            1013
67#define ECQSORT_DRHOOKSTR    'ECSORT_MIX:REAL4_ECQSORT'
68#define COUNT_DRHOOKSTR      'ECSORT_MIX:REAL4_COUNT'
69#define GNOME_DRHOOKSTR      'ECSORT_MIX:REAL4_GNOME'
70
71#elif INT_VERSION == 8
72
73#define DATA_TYPE            INTEGER(KIND=JPIB)
74#define SIZEOF_ME            sizeof_int8
75#define KEYSORT_1D           INT8_KEYSORT_1D
76#define KEYSORT_1D_DRHOOKSTR 'ECSORT_MIX:INT8_KEYSORT_1D'
77#define KEYSORT_2D           INT8_KEYSORT_2D
78#define KEYSORT_2D_DRHOOKSTR 'ECSORT_MIX:INT8_KEYSORT_2D'
79#define KEYSORT_NUMBER       14
80#define RSORT_DRHOOKSTR      'ECSORT_MIX:RSORT64_14'
81#define USE_RSORT64          1
82#define HEAPSORT             INT8_HEAPSORT
83#define HEAPSORT_DRHOOKSTR   'ECSORT_MIX:INT8_HEAPSORT'
84#define DBGPRINT             INT8_DBGPRINT
85#define DBGFMTNUM            1014
86#define ECQSORT_DRHOOKSTR    'ECSORT_MIX:INT8_ECQSORT'
87#define COUNT_DRHOOKSTR      'ECSORT_MIX:INT8_COUNT'
88#define GNOME_DRHOOKSTR      'ECSORT_MIX:INT8_GNOME'
89
90#else
91
92  ERROR in programming : No datatype given (should never have ended up here)
93
94#endif
95
96!----------------------------
97!--   Public subroutines   --
98!----------------------------
99
100
101SUBROUTINE KEYSORT_1D(rc, a, n, method, descending, index, init)
102!-- Please note that we assume that a(:) occupies consecutive memory locations
103INTEGER(KIND=JPIM), intent(out)           :: rc
104DATA_TYPE         , intent(inout)         :: a(:)
105INTEGER(KIND=JPIM), intent(in)            :: n
106INTEGER(KIND=JPIM), intent(in), OPTIONAL  :: method
107logical, intent(in), OPTIONAL  :: descending
108INTEGER(KIND=JPIM), intent(inout), TARGET, OPTIONAL :: index(:)
109logical, intent(in), OPTIONAL  :: init
110! === END OF INTERFACE BLOCK ===
111DATA_TYPE          , allocatable :: aa(:,:)
112INTEGER(KIND=JPIM) :: imethod, irev, idummy, index_adj
113logical :: LLfast, LLdescending, LLomp_okay, LLinit
114INTEGER(KIND=JPIM) :: ITID, ichunk, iret, inumt
115REAL(KIND=JPRB) :: ZHOOK_HANDLE
116
117IF (LHOOK) CALL DR_HOOK(KEYSORT_1D_DRHOOKSTR,0,ZHOOK_HANDLE)
118
119rc = 0
120if (n <= 0 .or. size(a) <= 0) goto 99
121
122if (present(descending)) then
123  LLdescending = descending
124else
125  LLdescending = .FALSE.
126endif
127
128irev = 0
129if (LLdescending) irev = 1
130
131ITID = OML_MY_THREAD()
132imethod = current_method(ITID)
133if (present(method)) then
134  imethod = min(max(min_method,method),max_method)
135endif
136
137if (imethod /= quicksort_method .and. &
138   &imethod /= countingsort_method) then
139  LLfast = .FALSE.
140else if (imethod == quicksort_method) then
141  !-- hasn't been implemented if index is present ;-(
142  LLfast = (&
143       & .not.present(index) .and. &
144       & .not.present(init))
145else if (imethod == countingsort_method) then
146  !-- index-presence is ok
147  LLfast = .TRUE.
148endif
149
150if (LLfast) then
151  !- Only Quick-sort & CountingSort covered
152
153  if (imethod == quicksort_method) then
154    inumt = OML_MAX_THREADS()
155    LLomp_okay = (inumt > 1 .and. nomp >= inumt .and. n >= nomp)
156    LLomp_okay = (LLomp_okay .and. .not. OML_IN_PARALLEL()) ! Prevents nested OpenMP
157    if (LLomp_okay) then
158      !-- Max 2-way OpenMP parallelism for now ...
159      ichunk = n/2
160!$OMP PARALLEL PRIVATE(iret)
161!$OMP SECTIONS
162!$OMP SECTION
163      CALL ecqsortfast(KEYSORT_NUMBER, ichunk, a(1), irev, iret)
164!$OMP SECTION
165      CALL ecqsortfast(KEYSORT_NUMBER, n-ichunk, a(ichunk+1), irev, iret)
166!$OMP END SECTIONS
167!$OMP END PARALLEL
168      CALL ecmerge2(KEYSORT_NUMBER, 1, ichunk, n-ichunk, a(1), &
169           & idummy, 0, 1, irev, idummy, rc)
170    else
171      CALL ecqsortfast(KEYSORT_NUMBER, n, a(1), irev, rc)
172    endif
173    GOTO 99
174
175  else if (imethod == countingsort_method) then
176    if (.not.present(index)) then
177      CALL ec_countingsort(KEYSORT_NUMBER, n, 1, 1, a(1), idummy, 0, 1, irev, rc)
178    else
179      LLinit = .FALSE.
180      if (present(init)) LLinit = init
181      if (LLinit) then
182        CALL init_index(index, index_adj=-1)
183        index_adj = 0
184      else
185        index_adj = 1
186      endif
187      CALL ec_countingsort(KEYSORT_NUMBER, n, 1, 1, a(1), index(1), size(index), index_adj, irev, rc)
188      if (index_adj == 0) CALL adjust_index(index, +1)
189    endif
190    GOTO 99
191
192  else
193    LLfast = .false.
194  endif
195endif
196
197!-- LLfast == .FALSE. :
198
199allocate(aa(n,1))
200
201if (LLdescending) then
202  aa(1:n,1) = -a(1:n)
203else
204  aa(1:n,1) = a(1:n)
205endif
206
207CALL keysort(rc, aa, n, method=method, index=index, init=init)
208
209if (LLdescending) then
210  a(1:n) = -aa(1:n,1)
211else
212  a(1:n) = aa(1:n,1)
213endif
214
215deallocate(aa)
216
21799 continue
218IF (LHOOK) CALL DR_HOOK(KEYSORT_1D_DRHOOKSTR,1,ZHOOK_HANDLE,n)
219END SUBROUTINE
220
221
222SUBROUTINE KEYSORT_2D(&
223     &rc, a, n,&
224     &key, multikey, method,&
225     &index, init, transposed)
226
227INTEGER(KIND=JPIM), intent(out)           :: rc
228DATA_TYPE         , intent(inout)         :: a(:,:)
229INTEGER(KIND=JPIM), intent(in)            :: n
230INTEGER(KIND=JPIM), intent(in), OPTIONAL  :: key, method
231INTEGER(KIND=JPIM), intent(in), OPTIONAL  :: multikey(:)
232logical, intent(in), OPTIONAL  :: transposed
233INTEGER(KIND=JPIM), intent(inout), TARGET, OPTIONAL :: index(:)
234logical, intent(in), OPTIONAL  :: init
235! === END OF INTERFACE BLOCK ===
236INTEGER(KIND=JPIM), POINTER :: iindex(:)
237INTEGER(KIND=JPIM) :: ikey, istride, imethod, inumkeys, imethod_1st, imethod_rest
238INTEGER(KIND=JPIM) :: lda, iptr, i, j, sda, idiff, irev, inumt, jkey, jj, ilastkey
239INTEGER(KIND=JPIM) :: j1, j2, jmid, inum, imax, iadd, imod, iret, inc, iamax, ibmax
240DATA_TYPE         , allocatable :: data(:)
241INTEGER(KIND=JPIM), allocatable :: ikeys(:), ista(:), ichunk(:), irank(:)
242logical LLinit, LLdescending, LLtrans, LLomp_okay, LLadjusted, LLdebug, LLomp_prefix
243character(len=1) clenv
244REAL(KIND=JPRB) :: ZHOOK_HANDLE, ZHOOK_SUBHANDLE
245REAL(KIND=JPRB) :: ZHOOK_SUBHANDLE0
246REAL(KIND=JPRB) :: ZHOOK_SUBHANDLE1
247REAL(KIND=JPRB) :: ZHOOK_SUBHANDLE2
248REAL(KIND=JPRB) :: ZHOOK_SUBHANDLE3
249INTEGER(KIND=JPIM) :: ITID
250
251IF (LHOOK) CALL DR_HOOK(KEYSORT_2D_DRHOOKSTR,0,ZHOOK_HANDLE)
252
253rc = 0
254lda = size(a, dim=1)
255sda = size(a, dim=2)
256if (n <= 0 .or. lda <= 0 .or. sda <= 0) goto 99
257
258inumt = OML_MAX_THREADS()
259ITID = OML_MY_THREAD()
260imethod = current_method(ITID)
261if (present(method)) then
262  imethod = min(max(min_method,method),max_method)
263endif
264imethod_1st = imethod
265imethod_rest = imethod
266
267ikey = 1
268if (present(key)) ikey = key
269
270if (present(multikey)) then
271  allocate(ikeys(size(multikey)))
272  ikeys(:) = multikey(:)
273else
274  allocate(ikeys(1))
275  ikeys(1) = ikey
276endif
277inumkeys = size(ikeys)
278
279!-- Only the RADIX-sort & now also QUICK-sort & CountingSort give the result we want with multiple keys
280if (inumkeys > 1 .and. &
281    imethod /= radixsort_method .and. &
282    imethod /= quicksort_method .and. &
283    imethod /= countingsort_method) then
284   imethod = default_method
285   imethod_1st = imethod
286   imethod_rest = imethod
287   !-- Since "default_method" may now be [overridden as] HEAP-sort, make sure its then "radixsort_method"
288   !   Note: The first sweep may still be e.g. HEAP-sort
289   if (imethod /= radixsort_method .and. &
290       imethod /= quicksort_method .and. &
291       imethod /= countingsort_method) then
292     imethod = radixsort_method
293     imethod_rest = imethod
294   endif
295endif
296
297LLinit = .FALSE.
298if (present(init)) LLinit = init
299
300if (present(index)) then
301  iindex => index(1:n)
302else
303  allocate(iindex(n))
304  LLinit = .TRUE.
305endif
306
307if (LLinit) CALL init_index(iindex)
308
309istride = 1
310LLtrans = .FALSE.
311if (present(transposed)) LLtrans = transposed
312if (LLtrans) then
313  istride = lda
314else if (sda >= 2 .and. lda >= 1) then
315!-- Check for presence of sub-array and adjust lda automatically
316  call addrdiff(a(1,1),a(1,2),idiff)
317  ! lda below: The true leading dimension; overrides sub-arrays one
318  lda = idiff/SIZEOF_ME
319endif
320
321ilastkey = 0
322LLadjusted = .FALSE.
323LLomp_prefix = .FALSE.
324!$ LLomp_prefix = (istride == 1 .and. nomp >= inumt .and. n >= nomp)
325if (LLomp_prefix) then
326  call get_environment_variable('EC_SORTING_DEBUG',clenv)
327  LLdebug = (clenv == '1' .and. n < 10000)
328  if (LLdebug) write(0,*)'>> EC_SORTING_DEBUG=1'
329else
330  LLdebug = .FALSE.
331endif
332
3331000 format(1x,a,2i12,:,/,(10i5))
3341001 format(1x,'[#',i2,']:',a,(10i5))
3351002 format(1x,'[#',i2,']:',a,:,/,(10i5))
3361003 format(1x,'[#',i2,']:',a,2i12,:,/,(10i5))
3371004 format(1x,a,:,(10i5))
3381005 format(1x,a,i2,1x,a)
339
340imethod = imethod_1st
341KEYLOOP: do jkey=inumkeys,1,-1
342!--   Sort by the least significant key first
343  ikey = abs(ikeys(jkey))
344  if (ikey == 0) cycle KEYLOOP
345
346  if (istride == 1) then
347    iptr = lda * (ikey - 1) + 1
348  else
349    iptr = ikey
350  endif
351
352  if (LLdebug) then
353    write(0,1000) '<BEGIN>iindex(1:n)=',n,sum(iindex(1:n)),iindex(1:n)
354    if (LLadjusted) then
355      CALL DBGPRINT(-jkey,'<BEGIN>',a,iindex,n,ikey,1,n,1)
356    else
357      CALL DBGPRINT(-jkey,'<BEGIN>',a,iindex,n,ikey,1,n,0)
358    endif
359    ilastkey = ikey
360  endif
361
362  LLdescending = (ikeys(jkey) < 0)
363  irev = 0
364  if (LLdescending) irev = 1
365
366  !-- Since "irev" is passed into the ecqsort, no explicit reversing is needed --> savings
367  if (imethod == quicksort_method .or. &
368      imethod == countingsort_method) LLdescending = .FALSE.
369
370  if (LLdescending) then
371    if (istride == 1) then
372      a(1:n,ikey) = -a(1:n,ikey)
373    else
374      a(ikey,1:n) = -a(ikey,1:n)
375    endif
376    irev = 0 ! prevents use of "reverse" algorithm in ecmerge2 for radix-sort
377  endif
378
379  LLomp_okay = LLomp_prefix .and. (inumt > 1) .and. (&
380       & imethod == radixsort_method .or. &
381       & imethod == quicksort_method .or. &
382       & imethod == countingsort_method)
383  LLomp_okay = LLomp_okay .and. (.not. OML_IN_PARALLEL()) ! Prevents nested OpenMP
384
385  if (.not.LLomp_okay) then
386    select case (imethod)
387    case (radixsort_method)
388      IF (LHOOK) CALL DR_HOOK(RSORT_DRHOOKSTR,0,ZHOOK_SUBHANDLE0)
389#if USE_RSORT64 == 1
390      CALL rsort64(KEYSORT_NUMBER, n, istride, iptr, a(1,1), iindex(1), 1, rc)
391#else
392      CALL rsort32_func(KEYSORT_NUMBER, n, istride, iptr, a(1,1), iindex(1), 1, rc)
393#endif
394      IF (LHOOK) CALL DR_HOOK(RSORT_DRHOOKSTR,1,ZHOOK_SUBHANDLE0, n)
395    case (heapsort_method)
396      if (istride == 1) then
397        CALL HEAPSORT(KEYSORT_NUMBER, n, a(1:n, ikey), rc, irev, istride, iindex)
398      else
399        CALL HEAPSORT(KEYSORT_NUMBER, n, a(ikey, 1:n), rc, irev, istride, iindex)
400      endif
401    case (quicksort_method)
402      IF (LHOOK) CALL DR_HOOK(ECQSORT_DRHOOKSTR,0,ZHOOK_SUBHANDLE0)
403      CALL ecqsort(KEYSORT_NUMBER, n, istride, iptr, a(1,1), iindex(1), 1, irev, rc)
404      IF (LHOOK) CALL DR_HOOK(ECQSORT_DRHOOKSTR,1,ZHOOK_SUBHANDLE0,n)
405    case (countingsort_method)
406      IF (LHOOK) CALL DR_HOOK(COUNT_DRHOOKSTR,0,ZHOOK_SUBHANDLE0)
407      CALL ec_countingsort(KEYSORT_NUMBER, n, istride, iptr, a(1,1), iindex(1), n, 1, irev, rc)
408      IF (LHOOK) CALL DR_HOOK(COUNT_DRHOOKSTR,1,ZHOOK_SUBHANDLE0,n)
409    case (gnomesort_method)
410      IF (LHOOK) CALL DR_HOOK(GNOME_DRHOOKSTR,0,ZHOOK_SUBHANDLE0)
411      CALL ecgnomesort(KEYSORT_NUMBER, n, istride, iptr, a(1,1), iindex(1), n, 1, rc)
412      IF (LHOOK) CALL DR_HOOK(GNOME_DRHOOKSTR,1,ZHOOK_SUBHANDLE0,n)
413    end select
414
415  else ! i.e. LLomp_okay ; radix, quick & counting -sorts only
416    if (.not.allocated(ista)) then
417      allocate(ista(inumt+1),ichunk(inumt))
418      inc = n/inumt
419      iadd = 1
420      imod = mod(n,inumt)
421      if (imod == 0) iadd = 0
422      ista(1) = 1
423      do j=2,inumt
424        ista(j) = ista(j-1) + inc + iadd
425        if (iadd > 0 .and. j > imod) iadd = 0
426      enddo
427      ista(inumt+1) = n + 1
428      do j=1,inumt
429        ichunk(j) = ista(j+1) - ista(j)
430      enddo
431      if (LLdebug) then
432        write(0,1005) '>> imethod,name=',imethod,method_name(imethod)
433        write(0,1004) '>> inumt,n,nomp=',inumt,n,nomp
434        write(0,1004) '>> ista(1:inumt+1)=',ista(1:inumt+1)
435        write(0,1004) '>> ichunk(1:inumt)=',ichunk(1:inumt)
436      endif
437      allocate(irank(n))
438    endif
439
440    if (LLdebug) write(0,1004) '>>KEYLOOP: jkey,ikey,irev,iptr=',jkey,ikey,irev,iptr
441
442    if (.not.LLadjusted) then ! only once
443      if (LLdebug) write(0,1000) '<1>iindex(1:n)=',n,sum(iindex(1:n)),iindex(1:n)
444      call adjust_index(iindex, -1) ! Fortran -> C
445      if (LLdebug) write(0,1000) '<2>iindex(1:n)=',n,sum(iindex(1:n))+n,iindex(1:n)
446      LLadjusted = .TRUE.
447    endif
448
449    if (LLdebug) write(0,*)'>> Sorting inumt-chunks in parallel'
450!$OMP PARALLEL PRIVATE(j,j1,j2,inum,iret,inc,ITID,ZHOOK_SUBHANDLE1,ZHOOK_SUBHANDLE2)
451    IF (LHOOK) CALL DR_HOOK('ECSORT_MIX:KEYSORT_2D>OMPSORT',0,ZHOOK_SUBHANDLE1)
452    ITID = OML_MY_THREAD()
453!$OMP DO SCHEDULE(DYNAMIC,1)
454    do j=1,inumt
455      j1 = ista(j)
456      inum = ichunk(j)
457      j2 = j1 + inum - 1
458      inc = j1
459      if (LLdebug) write(0,1001) ITID,'j,j1,j2,inum,inc=',j,j1,j2,inum,inc
460      if (LLdebug) write(0,1002) ITID,'iindex(j1:j2) > ',iindex(j1:j2)
461      select case (imethod)
462      case (radixsort_method)
463        IF (LHOOK) CALL DR_HOOK(RSORT_DRHOOKSTR,0,ZHOOK_SUBHANDLE2)
464#if USE_RSORT64 == 1
465        CALL rsort64(KEYSORT_NUMBER, inum, istride, iptr, a(1,1), iindex(j1), 0, iret)
466#else
467        CALL rsort32_func(KEYSORT_NUMBER, inum, istride, iptr, a(1,1), iindex(j1), 0, iret)
468#endif
469        IF (LHOOK) CALL DR_HOOK(RSORT_DRHOOKSTR,1,ZHOOK_SUBHANDLE2, inum)
470      case (quicksort_method)
471        IF (LHOOK) CALL DR_HOOK(ECQSORT_DRHOOKSTR,0,ZHOOK_SUBHANDLE2)
472        CALL ecqsort(KEYSORT_NUMBER, inum, istride, iptr, a(1,1), iindex(j1), 0, irev, iret)
473        IF (LHOOK) CALL DR_HOOK(ECQSORT_DRHOOKSTR,1,ZHOOK_SUBHANDLE2,inum)
474      case (countingsort_method)
475        IF (LHOOK) CALL DR_HOOK(COUNT_DRHOOKSTR,0,ZHOOK_SUBHANDLE2)
476        CALL ec_countingsort(KEYSORT_NUMBER, inum, istride, iptr, a(1,1), iindex(j1), inum, 0, irev, iret)
477        IF (LHOOK) CALL DR_HOOK(COUNT_DRHOOKSTR,1,ZHOOK_SUBHANDLE2,inum)
478      end select
479      if (LLdebug) write(0,1002) ITID,'iindex(j1:j2) < ',iindex(j1:j2)
480    enddo
481!$OMP END DO
482    IF (LHOOK) CALL DR_HOOK('ECSORT_MIX:KEYSORT_2D>OMPSORT',1,ZHOOK_SUBHANDLE1)
483!$OMP END PARALLEL
484
485    if (LLdebug) write(0,1000) '<after_sort>iindex(1:n)=',n,sum(iindex(1:n))+n,iindex(1:n)
486    if (LLdebug) CALL DBGPRINT(0,'<after_sort>',a,iindex,n,ikey,1,n,1)
487    CALL get_rank(iindex, irank, index_adj=+1)
488
489    if (LLdebug) write(0,*) '>> Merge neighbouring chunks in parallel as much as possible'
490    inc = 2
491    imax = (inumt+inc-1)/inc
492    do jj=1,imax
493      if (LLdebug) write(0,1001) jj,'<before_merge> jj,inc,imax,inumt=',jj,inc,imax,inumt
494!$OMP PARALLEL PRIVATE(j,j1,j2,inum,iamax,ibmax,jmid,iret,ZHOOK_SUBHANDLE3,ITID)
495      IF (LHOOK) CALL DR_HOOK('ECSORT_MIX:KEYSORT_2D>OMPMERGE',0,ZHOOK_SUBHANDLE3)
496      ITID = OML_MY_THREAD()
497!$OMP DO SCHEDULE(DYNAMIC,1)
498      do j=1,inumt,inc
499        j1 = j
500        j2 = j + inc - 1
501        jmid = (j1 + j2)/2 + 1
502        j2 = min(j2,inumt)
503        jmid = min(jmid,inumt)
504        if (LLdebug) write(0,1001) ITID,'j,j1,j2,jmid=',j,j1,j2,jmid
505        iamax = ista(jmid) - ista(j1)
506        inum = sum(ichunk(j1:j2))
507        ibmax = inum - iamax
508        if (LLdebug) write(0,1001) ITID,'j,iamax,ibmax,inum=',j,iamax,ibmax,inum
509        if (iamax == 0 .or. ibmax == 0 .or. inum == 0) cycle
510        j1 = ista(j1)
511        j2 = ista(j2+1) - 1
512        if (LLdebug) write(0,1001) ITID,'j,j1,j2,inum=',j,j1,j2,inum
513        if (LLdebug) write(0,1002) ITID,'iindex(j1:j2) > ',iindex(j1:j2)
514        call ecmerge2(KEYSORT_NUMBER, iptr, iamax, ibmax, a(1,1), &
515             & iindex(j1), inum, 0, irev, irank(1), iret)
516        if (LLdebug) write(0,1002) ITID,'iindex(j1:j2) < ',iindex(j1:j2)
517      enddo ! do j=1,inumt,inc
518!$OMP END DO
519      IF (LHOOK) CALL DR_HOOK('ECSORT_MIX:KEYSORT_2D>OMPMERGE',1,ZHOOK_SUBHANDLE3)
520!$OMP END PARALLEL
521
522      if (LLdebug) write(0,1003) jj,'<after_merge>iindex(1:n)=',n,sum(iindex(1:n))+n,iindex(1:n)
523      if (LLdebug) CALL DBGPRINT(jj,'<after_merge>',a,iindex,n,ikey,1,n,1)
524
525      inc = inc * 2
526    enddo ! do jj=1,imax
527    rc = n
528  endif ! if (LLomp_okay)
529
530  if (LLdescending) then
531    if (istride == 1) then
532      a(1:n,ikey) = -a(1:n,ikey)
533    else
534      a(ikey,1:n) = -a(ikey,1:n)
535    endif
536  endif
537
538  if (LLadjusted .and. imethod /= imethod_rest) then ! Restore back immediately
539    if (LLdebug) write(0,1000) '<3a>iindex(1:n)=',n,sum(iindex(1:n))+n,iindex(1:n)
540    call adjust_index(iindex, +1) ! C -> Fortran
541    if (LLdebug) write(0,1000) '<4a>iindex(1:n)=',n,sum(iindex(1:n)),iindex(1:n)
542    LLadjusted = .FALSE.
543  endif
544
545  imethod = imethod_rest
546enddo KEYLOOP
547
548deallocate(ikeys)
549if (allocated(ista)) deallocate(ista)
550if (allocated(ichunk)) deallocate(ichunk)
551if (allocated(irank)) deallocate(irank)
552
553if (LLadjusted) then ! Restore back
554  if (LLdebug) write(0,1000) '<3b>iindex(1:n)=',n,sum(iindex(1:n))+n,iindex(1:n)
555  call adjust_index(iindex, +1) ! C -> Fortran
556  if (LLdebug) write(0,1000) '<4b>iindex(1:n)=',n,sum(iindex(1:n)),iindex(1:n)
557  LLadjusted = .FALSE.
558endif
559
560if (LLdebug) write(0,1000) '<END>iindex(1:n)=',n,sum(iindex(1:n)),iindex(1:n)
561if (LLdebug) CALL DBGPRINT(0,'<END>',a,iindex,n,ilastkey,1,n,0)
562
563if (.not.present(index)) then
564  LLomp_okay = (nomp >= inumt .and. n >= nomp)
565  if (istride == 1) then
566    LLomp_okay = (LLomp_okay .and. sda >= inumt .and. .not. OML_IN_PARALLEL()) ! Prevents nested OpenMP
567!$OMP PARALLEL PRIVATE(j,data) IF (LLomp_okay)
568    allocate(data(n))
569!$OMP DO SCHEDULE(DYNAMIC,1)
570    do j=1,sda
571      data(1:n) = a(iindex(1:n),j)
572      a(1:n,j) = data(1:n)
573    enddo
574!$OMP END DO
575    deallocate(data)
576!$OMP END PARALLEL
577  else
578    LLomp_okay = (LLomp_okay .and. lda >= inumt .and. .not. OML_IN_PARALLEL()) ! Prevents nested OpenMP
579!$OMP PARALLEL PRIVATE(i,data) IF (LLomp_okay)
580    allocate(data(n))
581!$OMP DO SCHEDULE(DYNAMIC,1)
582    do i=1,lda
583      data(1:n) = a(i,iindex(1:n))
584      a(i,1:n) = data(1:n)
585    enddo
586!$OMP END DO
587    deallocate(data)
588!$OMP END PARALLEL
589  endif
590
591  deallocate(iindex)
592endif
593
59499 continue
595IF (LHOOK) CALL DR_HOOK(KEYSORT_2D_DRHOOKSTR,1,ZHOOK_HANDLE,n*inumkeys)
596END SUBROUTINE
597
598!-----------------------------
599!--   Private subroutines   --
600!-----------------------------
601
602SUBROUTINE DBGPRINT(jj, cdstr, a, index, n, key, k1, k2, kadd)
603character(len=*), intent(in) :: cdstr
604INTEGER(KIND=JPIM), intent(in) :: jj, n, key, k1, k2, kadd
605INTEGER(KIND=JPIM), intent(in) :: index(:)
606DATA_TYPE, intent(in) :: a(:,:)
607INTEGER(KIND=JPIM) :: i,j
6081000 FORMAT(i3,a,5i5)
6091011 FORMAT((5i12)) ! integer*4
6101012 FORMAT(1p,(5g20.12)) ! real*8
6111013 FORMAT(1p,(5g20.12)) ! real*4
6121014 FORMAT((5i12)) ! integer*8
613WRITE(0,1000) jj,cdstr//': n,key,k1,k2,kadd,a(index(k1:k2)+kadd,:)=',&
614     &                     n,key,k1,k2,kadd
615do j=k1,k2
616  i = index(j)+kadd
617  WRITE(0,'(2i6)',advance='no') j,i-kadd
618  WRITE(0,DBGFMTNUM) a(i,:)
619enddo
620END SUBROUTINE
621
622SUBROUTINE HEAPSORT(id, n, a, rc, irev, istride, index)
623INTEGER(KIND=JPIM), intent(in) :: id, n, irev, istride
624DATA_TYPE, intent(in) :: a(:)
625INTEGER(KIND=JPIM), intent(out) :: rc
626INTEGER(KIND=JPIM), intent(inout) :: index(:)
627INTEGER(KIND=JPIM) :: i,j,right,left,idx
628DATA_TYPE :: tmp
629REAL(KIND=JPRB) :: ZHOOK_HANDLE
630IF (LHOOK) CALL DR_HOOK(HEAPSORT_DRHOOKSTR,0,ZHOOK_HANDLE)
631rc = 0
632if (n <= 0 .or. size(a) <= 0) goto 99
633left  = n/2+1
634right = n
635LOOP: do
636  if (left > 1) then
637    left = left - 1
638    idx  = index(left)
639  else
640    idx = index(right)
641    index(right) = index(1)
642    right = right - 1
643    if (right == 1) then
644      index(1) = idx
645      exit LOOP
646    endif
647  endif
648  tmp = a(idx)
649  i = left
650  j = 2*left
651  do while (j <= right)
652    if (j < right) then
653      if (a(index(j)) < a(index(j+1))) j = j + 1
654    endif
655    if (tmp < a(index(j))) then
656      index(i) = index(j)
657      i = j
658      j = 2*j
659    else
660      j = right + 1
661    endif
662  enddo
663  index(i) = idx
664enddo LOOP
665rc = n
66699 continue
667IF (LHOOK) CALL DR_HOOK(HEAPSORT_DRHOOKSTR,1,ZHOOK_HANDLE)
668END SUBROUTINE
669
670
671
672#ifndef NO_UNDEF
673
674#undef DATA_TYPE
675#undef SIZEOF_ME
676#undef KEYSORT_1D
677#undef KEYSORT_1D_DRHOOKSTR
678#undef KEYSORT_2D
679#undef KEYSORT_2D_DRHOOKSTR
680#undef KEYSORT_NUMBER
681#undef RSORT_DRHOOKSTR
682#undef USE_RSORT64
683#undef QSORTFAST_DRHOOKSTR
684#undef HEAPSORT
685#undef HEAPSORT_DRHOOKSTR
686#undef DBGPRINT
687#undef DBGFMTNUM
688#undef ECQSORT_DRHOOKSTR
689#undef COUNT_DRHOOKSTR
690#undef GNOME_DRHOOKSTR
691
692#endif
Note: See TracBrowser for help on using the repository browser.