irpas技术客

Linux中Python程序CPU占用高排查_Python_xiaowu_python 占用cpu

网络投稿 6527

Linux中Python程序CPU占用高排查,Linux中Python程序CPU占用高排查,Linux中Python程序CPU占用高排查

?

kafka-python==2.0.2和 gevent 新版本 生产机器中发现CPU占用极高,应该是有bug:https://github.com/dpkp/kafka-python/issues/2168 。目前推测是gevent patch后Kafka Consumer 和 heartbeat 占用了大量的 CPU 。

机器环境 Debian 10Python 3.7.3kafka-python 2.0.2gevent 21.12.0 (greenlet 1.1.2) 测试代码 <span style="color:#333333"><span style="background-color:#f6f6f6"><span style="color:#444444"><span style="background-color:#f6f6f6"><span style="color:#888888"># vim cppla.py</span> <span style="color:#333333"><strong>from</strong></span> gevent <span style="color:#333333"><strong>import</strong></span> monkey <span style="color:#333333"><strong>from</strong></span> gevent.pywsgi <span style="color:#333333"><strong>import</strong></span> WSGIServer monkey.patch_all() <span style="color:#333333"><strong>from</strong></span> multiprocessing <span style="color:#333333"><strong>import</strong></span> cpu_count, Process <span style="color:#333333"><strong>from</strong></span> flask <span style="color:#333333"><strong>import</strong></span> Flask, jsonify <span style="color:#333333"><strong>from</strong></span> kafka <span style="color:#333333"><strong>import</strong></span> KafkaConsumer, KafkaProducer <span style="color:#333333"><strong>from</strong></span> kafka.errors <span style="color:#333333"><strong>import</strong></span> NoBrokersAvailable, KafkaTimeoutError app = Flask(__name__) <span style="color:#1f7199">@app.route("/cppla", methods=['GET'])</span> <span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>function_benchmark</strong></span>(): <span style="color:#333333"><strong>return</strong></span> jsonify( { <span style="color:#880000">"status"</span>: <span style="color:#880000">"ok"</span>, } ), <span style="color:#880000">200</span> <span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>run</strong></span>(): mulserver = WSGIServer((<span style="color:#880000">'0.0.0.0'</span>, <span style="color:#880000">8080</span>), app) mulserver.start() <span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>server_forever</strong></span>(): mulserver.start_accepting() mulserver._stop_event.wait() <span style="color:#333333"><strong>for</strong></span> i <span style="color:#333333"><strong>in</strong></span> range(cpu_count()): p = Process(target=server_forever) p.start() KAFKA_URI = { <span style="color:#880000">"BOOTSTRAP_SERVERS"</span>: [ <span style="color:#880000">'192.168.1.2:9092'</span>, <span style="color:#880000">'192.168.1.3:9092'</span>, <span style="color:#880000">'192.168.1.4:9092'</span> ], <span style="color:#880000">"TOPIC"</span>: <span style="color:#880000">"test"</span>, <span style="color:#880000">"GROUP_ID"</span>: <span style="color:#880000">"v1"</span>, <span style="color:#880000">"KEY"</span>: <span style="color:#880000">"order"</span> } <span style="color:#333333"><strong>class</strong></span> <span style="color:#880000"><strong>kafkaClient</strong></span>(object): <span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>__init__</strong></span>(self): print(<span style="color:#880000">"init start "</span>) self._producer_client = self._createProducer self._consumer_client = self._createConsumer print(<span style="color:#880000">"init end "</span>) <span style="color:#1f7199"> @property</span> <span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>_createProducer</strong></span>(self): <span style="color:#333333"><strong>try</strong></span>: <span style="color:#333333"><strong>return</strong></span> KafkaProducer( bootstrap_servers=KAFKA_URI[<span style="color:#880000">"BOOTSTRAP_SERVERS"</span>], retries=<span style="color:#880000">3</span> ) <span style="color:#333333"><strong>except</strong></span> NoBrokersAvailable: print(<span style="color:#880000">"bo brokers"</span>) <span style="color:#1f7199"> @property</span> <span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>_createConsumer</strong></span>(self): <span style="color:#333333"><strong>try</strong></span>: <span style="color:#333333"><strong>return</strong></span> KafkaConsumer( KAFKA_URI[<span style="color:#880000">"TOPIC"</span>], group_id=KAFKA_URI[<span style="color:#880000">"GROUP_ID"</span>], bootstrap_servers=KAFKA_URI[<span style="color:#880000">"BOOTSTRAP_SERVERS"</span>], auto_offset_reset=<span style="color:#880000">"latest"</span>, enable_auto_commit=<span style="color:#333333"><strong>True</strong></span>, auto_commit_interval_ms=<span style="color:#880000">5000</span>, ) <span style="color:#333333"><strong>except</strong></span> NoBrokersAvailable: print(<span style="color:#880000">"no brokers"</span>) <span style="color:#1f7199"> @property</span> <span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>consumer</strong></span>(self): print(<span style="color:#880000">"consumer function"</span>) <span style="color:#333333"><strong>try</strong></span>: <span style="color:#333333"><strong>for</strong></span> x <span style="color:#333333"><strong>in</strong></span> self._consumer_client: <span style="color:#333333"><strong>yield</strong></span> { <span style="color:#880000">"partition"</span>: x.partition, <span style="color:#880000">"timestamp"</span>: x.timestamp, <span style="color:#880000">"offset"</span>: x.offset, <span style="color:#880000">"value"</span>: x.value.decode() } <span style="color:#333333"><strong>except</strong></span> Exception <span style="color:#333333"><strong>as</strong></span> e: print(e) <span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>producer</strong></span>(self, msg): print(<span style="color:#880000">"consumer function"</span>) <span style="color:#333333"><strong>if</strong></span> <span style="color:#333333"><strong>not</strong></span> self._producer_client: print(<span style="color:#880000">"mark0"</span>) <span style="color:#333333"><strong>return</strong></span> <span style="color:#333333"><strong>False</strong></span> <span style="color:#333333"><strong>else</strong></span>: <span style="color:#333333"><strong>try</strong></span>: pre = datetime.datetime.now() self._producer_client.send( topic=KAFKA_URI[<span style="color:#880000">"TOPIC"</span>], key=KAFKA_URI[<span style="color:#880000">"KEY"</span>].encode(), value=msg.encode() ).add_callback(self.on_send_success).add_errback(self.on_send_error) next = datetime.datetime.now() <span style="color:#333333"><strong>if</strong></span> (next-pre).seconds > <span style="color:#880000">60</span>: print(<span style="color:#880000">"60s"</span>) <span style="color:#333333"><strong>return</strong></span> <span style="color:#333333"><strong>True</strong></span> <span style="color:#333333"><strong>except</strong></span> KafkaTimeoutError <span style="color:#333333"><strong>as</strong></span> e: print(<span style="color:#880000">"timeout"</span>) <span style="color:#333333"><strong>except</strong></span> Exception <span style="color:#333333"><strong>as</strong></span> e: print(<span style="color:#880000">"exception"</span>) <span style="color:#333333"><strong>return</strong></span> <span style="color:#333333"><strong>False</strong></span> <span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>on_send_success</strong></span>(self, metadata): print(metadata) <span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>on_send_error</strong></span>(self, excp): print(<span style="color:#880000">"excp"</span>) kafka_client = kafkaClient() <span style="color:#333333"><strong>if</strong></span> __name__ == <span style="color:#880000">"__main__"</span>: run()</span></span></span></span>

# vim cppla.py

from?gevent?import?monkey

from?gevent?.?pywsgi?import?WSGIServer

monkey?.?patch_all?(?)

from?multiprocessing?import?cpu_count?,?Process

from?flask?import?Flask?,?jsonify

from?kafka?import?KafkaConsumer?,?KafkaProducer

from?kafka?.?errors?import?NoBrokersAvailable?,?KafkaTimeoutError

app?=?Flask?(?__name__?)

@?app?.?route?(?"/cppla"?,?methods?=?[?'GET'?]?)

def?function_benchmark?(?)?:

?????return?jsonify?(

?????????{

?????????????"status"?:?"ok"?,

?????????}

?????)?,?200

def?run?(?)?:

?????mulserver?=?WSGIServer?(?(?'0.0.0.0'?,?8080?)?,?app?)

?????mulserver?.?start?(?)

?????def?server_forever?(?)?:

?????????mulserver?.?start_accepting?(?)

?????????mulserver?.?_stop_event?.?wait?(?)

?????for?i?in?range?(?cpu_count?(?)?)?:

?????????p?=?Process?(?target?=?server_forever?)

?????????p?.?start?(?)

KAFKA_URI?=?{

?????"BOOTSTRAP_SERVERS"?:?[

?????????'192.168.1.2:9092'?,

?????????'192.168.1.3:9092'?,

?????????'192.168.1.4:9092'

?????]?,

?????"TOPIC"?:?"test"?,

?????"GROUP_ID"?:?"v1"?,

?????"KEY"?:?"order"

}

class?kafkaClient?(?object?)?:

?????def?__init__?(?self?)?:

?????????print?(?"init start "?)

?????????self?.?_producer_client?=?self?.?_createProducer

?????????self?.?_consumer_client?=?self?.?_createConsumer

?????????print?(?"init end "?)

?????@?property

?????def?_createProducer?(?self?)?:

?????????try?:

?????????????return?KafkaProducer?(

?????????????????bootstrap_servers?=?KAFKA_URI?[?"BOOTSTRAP_SERVERS"?]?,

?????????????????retries?=?3

?????????????)

?????????except?NoBrokersAvailable?:

?????????????print?(?"bo brokers"?)

?????@?property

?????def?_createConsumer?(?self?)?:

?????????try?:

?????????????return?KafkaConsumer?(

?????????????????KAFKA_URI?[?"TOPIC"?]?,

?????????????????group_id?=?KAFKA_URI?[?"GROUP_ID"?]?,

?????????????????bootstrap_servers?=?KAFKA_URI?[?"BOOTSTRAP_SERVERS"?]?,

?????????????????auto_offset_reset?=?"latest"?,

?????????????????enable_auto_commit?=?True?,

?????????????????auto_commit_interval_ms?=?5000?,

?????????????)

?????????except?NoBrokersAvailable?:

?????????????print?(?"no brokers"?)

?????@?property

?????def?consumer?(?self?)?:

?????????print?(?"consumer function"?)

?????????try?:

?????????????for?x?in?self?.?_consumer_client?:

?????????????????yield?{

?????????????????????"partition"?:?x?.?partition?,

?????????????????????"timestamp"?:?x?.?timestamp?,

?????????????????????"offset"?:?x?.?offset?,

?????????????????????"value"?:?x?.?value?.?decode?(?)

?????????????????}

?????????except?Exception?as?e?:

?????????????print?(?e?)

?????def?producer?(?self?,?msg?)?:

?????????print?(?"consumer function"?)

?????????if?not?self?.?_producer_client?:

?????????????print?(?"mark0"?)

?????????????return?False

?????????其他?:

?????????????尝试?:

?????????????????pre?=?datetime?.?datetime?.?now?(?)

?????????????????自我?.?_producer_client?.?发送?(

?????????????????????主题?=?KAFKA_URI?[?"主题"?]?,

?????????????????????key?=?KAFKA_URI?[?"KEY"?]?。编码?(?)?,

?????????????????????值?=?消息?。?(?)

?????????????????)?。add_callback(自我on_send_success)add_errback(自我on_send_error?)

?????????????????next?=?datetime?.?datetime?.?now?(?)

?????????????????如果?(?下一个?-前?)?秒?>?60?:

?????????????????????打印?(?"60s"?)

?????????????????返回?True

?????????????除了KafkaTimeoutError?as?e?:

?????????????????打印?(?"超时"?)

?????????????例外情况如下:

?????????????????打印(?"异常"?)

?????????????????返回?False

?????定义on_send_success?(自身,元数据):

?????????打印?(?元数据?)

?????防御on_send_error?(?自我,?excp?)?:

?????????print?(?"excp"?)

kafka_client?=?kafkaClient?(?)

如果__name__?==?"__main__"?:

?????跑?(?)

top查看cpu占用

ps查看程序 <span style="color:#333333"><span style="background-color:#f6f6f6"><span style="color:#444444"><span style="background-color:#f6f6f6"><span style="color:#333333"><strong>ps</strong></span> -ef | <span style="color:#333333"><strong>grep</strong></span> cppla <span style="color:#333333"><strong>ps</strong></span> -aux | <span style="color:#333333"><strong>grep</strong></span> <span style="color:#880000">26254</span></span></span></span></span>

ps?-?ef?|?grep?cppla

ps?-?辅助|?grep?26254

进程的线程CPU占用情况 <span style="color:#333333"><span style="background-color:#f6f6f6"><span style="color:#444444"><span style="background-color:#f6f6f6"><span style="color:#333333"><strong>top</strong></span> -H -p <span style="color:#880000">26254</span> <span style="color:#888888"># 这个程序未开启线程</span></span></span></span></span>

顶部?-?H?-?p?26254

# 这个程序未开启线程

跟踪进程的执行堆栈 <span style="color:#333333"><span style="background-color:#f6f6f6"><span style="color:#444444"><span style="background-color:#f6f6f6">注意debian apt下的pstack无法使用,乱码,谷歌上找到了Centos下的pstack脚本 <span style="color:#888888"># vim pstack.sh</span> <span style="color:#1f7199">#!/bin/sh </span> <span style="color:#333333"><strong>if</strong></span> <span style="color:#397300">test</span> <span style="color:#bc6060">$#</span> -ne 1; <span style="color:#333333"><strong>then</strong></span> <span style="color:#397300">echo</span> <span style="color:#880000">"Usage: `basename <span style="color:#bc6060">$0</span> .sh` <process-id>"</span> 1>&2 <span style="color:#397300">exit</span> 1 <span style="color:#333333"><strong>fi</strong></span> <span style="color:#333333"><strong>if</strong></span> <span style="color:#397300">test</span> ! -r /proc/<span style="color:#bc6060">$1</span>; <span style="color:#333333"><strong>then</strong></span> <span style="color:#397300">echo</span> <span style="color:#880000">"Process <span style="color:#bc6060">$1</span> not found."</span> 1>&2 <span style="color:#397300">exit</span> 1 <span style="color:#333333"><strong>fi</strong></span> <span style="color:#888888"># GDB doesn't allow "thread apply all bt" when the process isn't</span> <span style="color:#888888"># threaded; need to peek at the process to determine if that or the</span> <span style="color:#888888"># simpler "bt" should be used.</span> backtrace=<span style="color:#880000">"bt"</span> <span style="color:#333333"><strong>if</strong></span> <span style="color:#397300">test</span> -d /proc/<span style="color:#bc6060">$1</span>/task ; <span style="color:#333333"><strong>then</strong></span> <span style="color:#888888"># Newer kernel; has a task/ directory.</span> <span style="color:#333333"><strong>if</strong></span> <span style="color:#397300">test</span> `/bin/ls /proc/<span style="color:#bc6060">$1</span>/task | /usr/bin/wc -l` -gt 1 2>/dev/null ; <span style="color:#333333"><strong>then</strong></span> backtrace=<span style="color:#880000">"thread apply all bt"</span> <span style="color:#333333"><strong>fi</strong></span> <span style="color:#333333"><strong>elif</strong></span> <span style="color:#397300">test</span> -f /proc/<span style="color:#bc6060">$1</span>/maps ; <span style="color:#333333"><strong>then</strong></span> <span style="color:#888888"># Older kernel; go by it loading libpthread.</span> <span style="color:#333333"><strong>if</strong></span> /bin/grep -e libpthread /proc/<span style="color:#bc6060">$1</span>/maps > /dev/null 2>&1 ; <span style="color:#333333"><strong>then</strong></span> backtrace=<span style="color:#880000">"thread apply all bt"</span> <span style="color:#333333"><strong>fi</strong></span> <span style="color:#333333"><strong>fi</strong></span> GDB=<span style="color:#bc6060">${GDB:-/usr/bin/gdb}</span> <span style="color:#888888"># Run GDB, strip out unwanted noise.</span> <span style="color:#888888"># --readnever is no longer used since .gdb_index is now in use.</span> <span style="color:#bc6060">$GDB</span> --quiet -nx <span style="color:#bc6060">$GDBARGS</span> /proc/<span style="color:#bc6060">$1</span>/exe <span style="color:#bc6060">$1</span> <<EOF 2>&1 | <span style="color:#397300">set</span> width 0 <span style="color:#397300">set</span> height 0 <span style="color:#397300">set</span> pagination no <span style="color:#bc6060">$backtrace</span> EOF /bin/sed -n \ -e <span style="color:#880000">'s/^\((gdb) \)*//'</span> \ -e <span style="color:#880000">'/^#/p'</span> \ -e <span style="color:#880000">'/^Thread/p'</span> watch ./pstack.sh 26254</span></span></span></span>

注意?debian?apt?下的?pstack?无法使用,乱码,谷歌上找到了?Centos?下的?pstack?脚本

# vim pstack.sh

#!/bin/sh

if?test?$?# -ne 1; then

?????echo?"Usage: `basename $0 .sh` <process-id>"?1?>?&?2

?????exit?1

fi

if?test?!?-?r?/?proc?/?$?1?;?then

?????echo?"Process $1 not found."?1?>?&?2

?????exit?1

fi

# GDB doesn't allow "thread apply all bt" when the process isn't

# threaded; need to peek at the process to determine if that or the

# simpler "bt" should be used.

backtrace?=?"bt"

if?test?-?d?/?proc?/?$?1?/?task?;?then

?????# Newer kernel; has a task/ directory.

?????if?test?`?/?bin?/?ls?/?proc?/?$?1?/?task?|?/?usr?/?bin?/?wc?-?l?`?-?gt?1?2?>?/?dev?/?null?;?then

?????????backtrace?=?"thread apply all bt"

?????fi

elif?test?-?f?/?proc?/?$?1?/?maps?;?then

?????# Older kernel; go by it loading libpthread.

?????if?/?bin?/?grep?-?e?libpthread?/?proc?/?$?1?/?maps?>?/?dev?/?null?2?>?&?1?;?then

?????????backtrace?=?"thread apply all bt"

?????断续器

断续器

GDB?=?$?{?GDB?:?-?/?usr?/?bin?/?gdb?}

#运行GDB,去除不需要的噪音。

# --readnever 不再使用,因为.gdb_index现在正在使用中。

$?GDB?--安静?-?nx?$?GDBARGS?/?proc?/?$?1?/?exe?$?1?<<?EOF?2?>?&?1?|

设置宽度?0

设定高度?0

设置分页否

$?回溯

EOF

/?箱?/?sed?-?n?\

?????-?e?'s/^?\?(?(?gdb?)?\)*//'?\

?????-?e?'/^#/p'?\

?????-?e?'/^Thread/p'

观看?.?/?pstack?.?sh?26254

跟踪结果如下,gevent长时间再做epoll_wait,epoll_poll操作?

跟踪函数的调用 <span style="color:#333333"><span style="background-color:#f6f6f6"><span style="color:#444444"><span style="background-color:#f6f6f6"><span style="color:#888888"># apt install strace</span> <span style="color:#333333"><strong>strace</strong></span> -f -p <span style="color:#880000">26254</span></span></span></span></span>

# apt install strace

strace?-?f?-?p?26254

执行结果

<span style="color:#333333"><span style="background-color:#f6f6f6"><span style="color:#444444"><span style="background-color:#f6f6f6"># epoll_wait dead loop getpid() = <span style="color:#880000">26254</span> clock_gettime(<span style="color:#333333"><strong>CLOCK_MONOTONIC</strong></span>, {tv_sec=4174, tv_nsec=439918115}) = <span style="color:#880000">0</span> clock_gettime(<span style="color:#333333"><strong>CLOCK_MONOTONIC</strong></span>, {tv_sec=4174, tv_nsec=439940304}) = <span style="color:#880000">0</span> epoll_wait(<span style="color:#880000">5</span>, [{EPOLLOUT, {u32=7, u64=12884901895}}], <span style="color:#880000">64</span>, <span style="color:#880000">0</span>) = <span style="color:#880000">1</span> clock_gettime(<span style="color:#333333"><strong>CLOCK_MONOTONIC</strong></span>, {tv_sec=4174, tv_nsec=439983956}) = <span style="color:#880000">0</span> getpid() = <span style="color:#880000">26254</span> clock_gettime(<span style="color:#333333"><strong>CLOCK_MONOTONIC</strong></span>, {tv_sec=4174, tv_nsec=440030506}) = <span style="color:#880000">0</span> clock_gettime(<span style="color:#333333"><strong>CLOCK_MONOTONIC</strong></span>, {tv_sec=4174, tv_nsec=440052716}) = <span style="color:#880000">0</span> epoll_wait(<span style="color:#880000">5</span>, [{EPOLLOUT, {u32=7, u64=12884901895}}], <span style="color:#880000">64</span>, <span style="color:#880000">0</span>) = <span style="color:#880000">1</span> clock_gettime(<span style="color:#333333"><strong>CLOCK_MONOTONIC</strong></span>, {tv_sec=4174, tv_nsec=440096569}) = <span style="color:#880000">0</span></span></span></span></span>

# epoll_wait死环

getpid?(?)?=?26254

clock_gettime?(CLOCK_MONOTONIC?,{tv_sec?=?4174,tv_nsec?=?439918115?}?)?=?0

clock_gettime?(?CLOCK_MONOTONIC?,?{?tv_sec?=?4174?,?tv_nsec?=?439940304?}?)?=?0

epoll_wait?(?5?,?[?{?EPOLLOUT?,?{?u32?=?7?,?u64?=?12884901895?}?}?}?]?,?64,?0?)?=?1

clock_gettime?(?CLOCK_MONOTONIC?,?{?tv_sec?=?4174?,?tv_nsec?=?439983956?}?)?=?0

getpid?(?)?=?26254

clock_gettime?(?CLOCK_MONOTONIC?,?{?tv_sec?=?4174?,?tv_nsec?=?440030506?}?)?=?0

clock_gettime?(?CLOCK_MONOTONIC?,{?tv_sec?=?4174?,tv_nsec?=?440052716?}?)?=?0

epoll_wait?(?5?,?[?{?EPOLLOUT?,?{?u32?=?7?,?u64?=?12884901895?}?}?}?]?,?64,?0?)?=?1

clock_gettime?(?CLOCK_MONOTONIC?,?{?tv_sec?=?4174?,?tv_nsec?=?440096569?}?)?=?0

调试结果 <span style="color:#444444"><span style="background-color:#f6f6f6">I test my code again , high cpu, environment:gevent == <span style="color:#880000">20.9</span>.<span style="color:#880000">0</span> <span style="color:#397300">and</span> kafka-<span style="color:#333333"><strong>python</strong></span> == <span style="color:#880000">1.4</span>.<span style="color:#880000">7</span> I test my code again , <span style="color:#333333"><strong>normal</strong></span> cpu, environment:gevent == <span style="color:#880000">1.5</span>.<span style="color:#880000">0</span> <span style="color:#397300">and</span> kafka-<span style="color:#333333"><strong>python</strong></span> == <span style="color:#880000">2.0</span>.<span style="color:#880000">2</span> But in github issues another test gevent==<span style="color:#880000">20.9</span>.<span style="color:#880000">0</span> <span style="color:#397300">and</span> kafka-<span style="color:#333333"><strong>python</strong></span> == <span style="color:#880000">1.4</span>.<span style="color:#880000">7</span> <span style="color:#333333"><strong>is</strong></span> <span style="color:#333333"><strong>normal</strong></span>. </span></span>

monkey后,gevent在做大量的电话等待操作。直接导致了死循环。处理方案:gevent降级为gevent==1.5.0 (greenlet==0.4.15)后,一切正常。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Python #占用CPU #gevent #新版本 #目前推测是gevent