Linux中Python程序CPU占用高排查,Linux中Python程序CPU占用高排查,Linux中Python程序CPU占用高排查
?
kafka-python==2.0.2和 gevent 新版本 生产机器中发现CPU占用极高,应该是有bug:https://github.com/dpkp/kafka-python/issues/2168 。目前推测是gevent patch后Kafka Consumer 和 heartbeat 占用了大量的 CPU 。
机器环境
Debian 10Python 3.7.3kafka-python 2.0.2gevent 21.12.0 (greenlet 1.1.2)
测试代码
<span style="color:#333333"><span style="background-color:#f6f6f6"><span style="color:#444444"><span style="background-color:#f6f6f6"><span style="color:#888888"># vim cppla.py</span>
<span style="color:#333333"><strong>from</strong></span> gevent <span style="color:#333333"><strong>import</strong></span> monkey
<span style="color:#333333"><strong>from</strong></span> gevent.pywsgi <span style="color:#333333"><strong>import</strong></span> WSGIServer
monkey.patch_all()
<span style="color:#333333"><strong>from</strong></span> multiprocessing <span style="color:#333333"><strong>import</strong></span> cpu_count, Process
<span style="color:#333333"><strong>from</strong></span> flask <span style="color:#333333"><strong>import</strong></span> Flask, jsonify
<span style="color:#333333"><strong>from</strong></span> kafka <span style="color:#333333"><strong>import</strong></span> KafkaConsumer, KafkaProducer
<span style="color:#333333"><strong>from</strong></span> kafka.errors <span style="color:#333333"><strong>import</strong></span> NoBrokersAvailable, KafkaTimeoutError
app = Flask(__name__)
<span style="color:#1f7199">@app.route("/cppla", methods=['GET'])</span>
<span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>function_benchmark</strong></span>():
<span style="color:#333333"><strong>return</strong></span> jsonify(
{
<span style="color:#880000">"status"</span>: <span style="color:#880000">"ok"</span>,
}
), <span style="color:#880000">200</span>
<span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>run</strong></span>():
mulserver = WSGIServer((<span style="color:#880000">'0.0.0.0'</span>, <span style="color:#880000">8080</span>), app)
mulserver.start()
<span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>server_forever</strong></span>():
mulserver.start_accepting()
mulserver._stop_event.wait()
<span style="color:#333333"><strong>for</strong></span> i <span style="color:#333333"><strong>in</strong></span> range(cpu_count()):
p = Process(target=server_forever)
p.start()
KAFKA_URI = {
<span style="color:#880000">"BOOTSTRAP_SERVERS"</span>: [
<span style="color:#880000">'192.168.1.2:9092'</span>,
<span style="color:#880000">'192.168.1.3:9092'</span>,
<span style="color:#880000">'192.168.1.4:9092'</span>
],
<span style="color:#880000">"TOPIC"</span>: <span style="color:#880000">"test"</span>,
<span style="color:#880000">"GROUP_ID"</span>: <span style="color:#880000">"v1"</span>,
<span style="color:#880000">"KEY"</span>: <span style="color:#880000">"order"</span>
}
<span style="color:#333333"><strong>class</strong></span> <span style="color:#880000"><strong>kafkaClient</strong></span>(object):
<span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>__init__</strong></span>(self):
print(<span style="color:#880000">"init start "</span>)
self._producer_client = self._createProducer
self._consumer_client = self._createConsumer
print(<span style="color:#880000">"init end "</span>)
<span style="color:#1f7199"> @property</span>
<span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>_createProducer</strong></span>(self):
<span style="color:#333333"><strong>try</strong></span>:
<span style="color:#333333"><strong>return</strong></span> KafkaProducer(
bootstrap_servers=KAFKA_URI[<span style="color:#880000">"BOOTSTRAP_SERVERS"</span>],
retries=<span style="color:#880000">3</span>
)
<span style="color:#333333"><strong>except</strong></span> NoBrokersAvailable:
print(<span style="color:#880000">"bo brokers"</span>)
<span style="color:#1f7199"> @property</span>
<span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>_createConsumer</strong></span>(self):
<span style="color:#333333"><strong>try</strong></span>:
<span style="color:#333333"><strong>return</strong></span> KafkaConsumer(
KAFKA_URI[<span style="color:#880000">"TOPIC"</span>],
group_id=KAFKA_URI[<span style="color:#880000">"GROUP_ID"</span>],
bootstrap_servers=KAFKA_URI[<span style="color:#880000">"BOOTSTRAP_SERVERS"</span>],
auto_offset_reset=<span style="color:#880000">"latest"</span>,
enable_auto_commit=<span style="color:#333333"><strong>True</strong></span>,
auto_commit_interval_ms=<span style="color:#880000">5000</span>,
)
<span style="color:#333333"><strong>except</strong></span> NoBrokersAvailable:
print(<span style="color:#880000">"no brokers"</span>)
<span style="color:#1f7199"> @property</span>
<span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>consumer</strong></span>(self):
print(<span style="color:#880000">"consumer function"</span>)
<span style="color:#333333"><strong>try</strong></span>:
<span style="color:#333333"><strong>for</strong></span> x <span style="color:#333333"><strong>in</strong></span> self._consumer_client:
<span style="color:#333333"><strong>yield</strong></span> {
<span style="color:#880000">"partition"</span>: x.partition,
<span style="color:#880000">"timestamp"</span>: x.timestamp,
<span style="color:#880000">"offset"</span>: x.offset,
<span style="color:#880000">"value"</span>: x.value.decode()
}
<span style="color:#333333"><strong>except</strong></span> Exception <span style="color:#333333"><strong>as</strong></span> e:
print(e)
<span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>producer</strong></span>(self, msg):
print(<span style="color:#880000">"consumer function"</span>)
<span style="color:#333333"><strong>if</strong></span> <span style="color:#333333"><strong>not</strong></span> self._producer_client:
print(<span style="color:#880000">"mark0"</span>)
<span style="color:#333333"><strong>return</strong></span> <span style="color:#333333"><strong>False</strong></span>
<span style="color:#333333"><strong>else</strong></span>:
<span style="color:#333333"><strong>try</strong></span>:
pre = datetime.datetime.now()
self._producer_client.send(
topic=KAFKA_URI[<span style="color:#880000">"TOPIC"</span>],
key=KAFKA_URI[<span style="color:#880000">"KEY"</span>].encode(),
value=msg.encode()
).add_callback(self.on_send_success).add_errback(self.on_send_error)
next = datetime.datetime.now()
<span style="color:#333333"><strong>if</strong></span> (next-pre).seconds > <span style="color:#880000">60</span>:
print(<span style="color:#880000">"60s"</span>)
<span style="color:#333333"><strong>return</strong></span> <span style="color:#333333"><strong>True</strong></span>
<span style="color:#333333"><strong>except</strong></span> KafkaTimeoutError <span style="color:#333333"><strong>as</strong></span> e:
print(<span style="color:#880000">"timeout"</span>)
<span style="color:#333333"><strong>except</strong></span> Exception <span style="color:#333333"><strong>as</strong></span> e:
print(<span style="color:#880000">"exception"</span>)
<span style="color:#333333"><strong>return</strong></span> <span style="color:#333333"><strong>False</strong></span>
<span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>on_send_success</strong></span>(self, metadata):
print(metadata)
<span style="color:#333333"><strong>def</strong></span> <span style="color:#880000"><strong>on_send_error</strong></span>(self, excp):
print(<span style="color:#880000">"excp"</span>)
kafka_client = kafkaClient()
<span style="color:#333333"><strong>if</strong></span> __name__ == <span style="color:#880000">"__main__"</span>:
run()</span></span></span></span>
# vim cppla.py
from?gevent?import?monkey
from?gevent?.?pywsgi?import?WSGIServer
monkey?.?patch_all?(?)
from?multiprocessing?import?cpu_count?,?Process
from?flask?import?Flask?,?jsonify
from?kafka?import?KafkaConsumer?,?KafkaProducer
from?kafka?.?errors?import?NoBrokersAvailable?,?KafkaTimeoutError
app?=?Flask?(?__name__?)
@?app?.?route?(?"/cppla"?,?methods?=?[?'GET'?]?)
def?function_benchmark?(?)?:
?????return?jsonify?(
?????????{
?????????????"status"?:?"ok"?,
?????????}
?????)?,?200
def?run?(?)?:
?????mulserver?=?WSGIServer?(?(?'0.0.0.0'?,?8080?)?,?app?)
?????mulserver?.?start?(?)
?????def?server_forever?(?)?:
?????????mulserver?.?start_accepting?(?)
?????????mulserver?.?_stop_event?.?wait?(?)
?????for?i?in?range?(?cpu_count?(?)?)?:
?????????p?=?Process?(?target?=?server_forever?)
?????????p?.?start?(?)
KAFKA_URI?=?{
?????"BOOTSTRAP_SERVERS"?:?[
?????????'192.168.1.2:9092'?,
?????????'192.168.1.3:9092'?,
?????????'192.168.1.4:9092'
?????]?,
?????"TOPIC"?:?"test"?,
?????"GROUP_ID"?:?"v1"?,
?????"KEY"?:?"order"
}
class?kafkaClient?(?object?)?:
?????def?__init__?(?self?)?:
?????????print?(?"init start "?)
?????????self?.?_producer_client?=?self?.?_createProducer
?????????self?.?_consumer_client?=?self?.?_createConsumer
?????????print?(?"init end "?)
?????@?property
?????def?_createProducer?(?self?)?:
?????????try?:
?????????????return?KafkaProducer?(
?????????????????bootstrap_servers?=?KAFKA_URI?[?"BOOTSTRAP_SERVERS"?]?,
?????????????????retries?=?3
?????????????)
?????????except?NoBrokersAvailable?:
?????????????print?(?"bo brokers"?)
?????@?property
?????def?_createConsumer?(?self?)?:
?????????try?:
?????????????return?KafkaConsumer?(
?????????????????KAFKA_URI?[?"TOPIC"?]?,
?????????????????group_id?=?KAFKA_URI?[?"GROUP_ID"?]?,
?????????????????bootstrap_servers?=?KAFKA_URI?[?"BOOTSTRAP_SERVERS"?]?,
?????????????????auto_offset_reset?=?"latest"?,
?????????????????enable_auto_commit?=?True?,
?????????????????auto_commit_interval_ms?=?5000?,
?????????????)
?????????except?NoBrokersAvailable?:
?????????????print?(?"no brokers"?)
?????@?property
?????def?consumer?(?self?)?:
?????????print?(?"consumer function"?)
?????????try?:
?????????????for?x?in?self?.?_consumer_client?:
?????????????????yield?{
?????????????????????"partition"?:?x?.?partition?,
?????????????????????"timestamp"?:?x?.?timestamp?,
?????????????????????"offset"?:?x?.?offset?,
?????????????????????"value"?:?x?.?value?.?decode?(?)
?????????????????}
?????????except?Exception?as?e?:
?????????????print?(?e?)
?????def?producer?(?self?,?msg?)?:
?????????print?(?"consumer function"?)
?????????if?not?self?.?_producer_client?:
?????????????print?(?"mark0"?)
?????????????return?False
?????????其他?:
?????????????尝试?:
?????????????????pre?=?datetime?.?datetime?.?now?(?)
?????????????????自我?.?_producer_client?.?发送?(
?????????????????????主题?=?KAFKA_URI?[?"主题"?]?,
?????????????????????key?=?KAFKA_URI?[?"KEY"?]?。编码?(?)?,
?????????????????????值?=?消息?。?(?)
?????????????????)?。add_callback(自我on_send_success)add_errback(自我on_send_error?)
?????????????????next?=?datetime?.?datetime?.?now?(?)
?????????????????如果?(?下一个?-前?)?秒?>?60?:
?????????????????????打印?(?"60s"?)
?????????????????返回?True
?????????????除了KafkaTimeoutError?as?e?:
?????????????????打印?(?"超时"?)
?????????????例外情况如下:
?????????????????打印(?"异常"?)
?????????????????返回?False
?????定义on_send_success?(自身,元数据):
?????????打印?(?元数据?)
?????防御on_send_error?(?自我,?excp?)?:
?????????print?(?"excp"?)
kafka_client?=?kafkaClient?(?)
如果__name__?==?"__main__"?:
?????跑?(?)
top查看cpu占用
ps查看程序
<span style="color:#333333"><span style="background-color:#f6f6f6"><span style="color:#444444"><span style="background-color:#f6f6f6"><span style="color:#333333"><strong>ps</strong></span> -ef | <span style="color:#333333"><strong>grep</strong></span> cppla
<span style="color:#333333"><strong>ps</strong></span> -aux | <span style="color:#333333"><strong>grep</strong></span> <span style="color:#880000">26254</span></span></span></span></span>
ps?-?ef?|?grep?cppla
ps?-?辅助|?grep?26254
进程的线程CPU占用情况
<span style="color:#333333"><span style="background-color:#f6f6f6"><span style="color:#444444"><span style="background-color:#f6f6f6"><span style="color:#333333"><strong>top</strong></span> -H -p <span style="color:#880000">26254</span>
<span style="color:#888888"># 这个程序未开启线程</span></span></span></span></span>
顶部?-?H?-?p?26254
# 这个程序未开启线程
跟踪进程的执行堆栈
<span style="color:#333333"><span style="background-color:#f6f6f6"><span style="color:#444444"><span style="background-color:#f6f6f6">注意debian apt下的pstack无法使用,乱码,谷歌上找到了Centos下的pstack脚本
<span style="color:#888888"># vim pstack.sh</span>
<span style="color:#1f7199">#!/bin/sh
</span>
<span style="color:#333333"><strong>if</strong></span> <span style="color:#397300">test</span> <span style="color:#bc6060">$#</span> -ne 1; <span style="color:#333333"><strong>then</strong></span>
<span style="color:#397300">echo</span> <span style="color:#880000">"Usage: `basename <span style="color:#bc6060">$0</span> .sh` <process-id>"</span> 1>&2
<span style="color:#397300">exit</span> 1
<span style="color:#333333"><strong>fi</strong></span>
<span style="color:#333333"><strong>if</strong></span> <span style="color:#397300">test</span> ! -r /proc/<span style="color:#bc6060">$1</span>; <span style="color:#333333"><strong>then</strong></span>
<span style="color:#397300">echo</span> <span style="color:#880000">"Process <span style="color:#bc6060">$1</span> not found."</span> 1>&2
<span style="color:#397300">exit</span> 1
<span style="color:#333333"><strong>fi</strong></span>
<span style="color:#888888"># GDB doesn't allow "thread apply all bt" when the process isn't</span>
<span style="color:#888888"># threaded; need to peek at the process to determine if that or the</span>
<span style="color:#888888"># simpler "bt" should be used.</span>
backtrace=<span style="color:#880000">"bt"</span>
<span style="color:#333333"><strong>if</strong></span> <span style="color:#397300">test</span> -d /proc/<span style="color:#bc6060">$1</span>/task ; <span style="color:#333333"><strong>then</strong></span>
<span style="color:#888888"># Newer kernel; has a task/ directory.</span>
<span style="color:#333333"><strong>if</strong></span> <span style="color:#397300">test</span> `/bin/ls /proc/<span style="color:#bc6060">$1</span>/task | /usr/bin/wc -l` -gt 1 2>/dev/null ; <span style="color:#333333"><strong>then</strong></span>
backtrace=<span style="color:#880000">"thread apply all bt"</span>
<span style="color:#333333"><strong>fi</strong></span>
<span style="color:#333333"><strong>elif</strong></span> <span style="color:#397300">test</span> -f /proc/<span style="color:#bc6060">$1</span>/maps ; <span style="color:#333333"><strong>then</strong></span>
<span style="color:#888888"># Older kernel; go by it loading libpthread.</span>
<span style="color:#333333"><strong>if</strong></span> /bin/grep -e libpthread /proc/<span style="color:#bc6060">$1</span>/maps > /dev/null 2>&1 ; <span style="color:#333333"><strong>then</strong></span>
backtrace=<span style="color:#880000">"thread apply all bt"</span>
<span style="color:#333333"><strong>fi</strong></span>
<span style="color:#333333"><strong>fi</strong></span>
GDB=<span style="color:#bc6060">${GDB:-/usr/bin/gdb}</span>
<span style="color:#888888"># Run GDB, strip out unwanted noise.</span>
<span style="color:#888888"># --readnever is no longer used since .gdb_index is now in use.</span>
<span style="color:#bc6060">$GDB</span> --quiet -nx <span style="color:#bc6060">$GDBARGS</span> /proc/<span style="color:#bc6060">$1</span>/exe <span style="color:#bc6060">$1</span> <<EOF 2>&1 |
<span style="color:#397300">set</span> width 0
<span style="color:#397300">set</span> height 0
<span style="color:#397300">set</span> pagination no
<span style="color:#bc6060">$backtrace</span>
EOF
/bin/sed -n \
-e <span style="color:#880000">'s/^\((gdb) \)*//'</span> \
-e <span style="color:#880000">'/^#/p'</span> \
-e <span style="color:#880000">'/^Thread/p'</span>
watch ./pstack.sh 26254</span></span></span></span>
注意?debian?apt?下的?pstack?无法使用,乱码,谷歌上找到了?Centos?下的?pstack?脚本
# vim pstack.sh
#!/bin/sh
if?test?$?# -ne 1; then
?????echo?"Usage: `basename $0 .sh` <process-id>"?1?>?&?2
?????exit?1
fi
if?test?!?-?r?/?proc?/?$?1?;?then
?????echo?"Process $1 not found."?1?>?&?2
?????exit?1
fi
# GDB doesn't allow "thread apply all bt" when the process isn't
# threaded; need to peek at the process to determine if that or the
# simpler "bt" should be used.
backtrace?=?"bt"
if?test?-?d?/?proc?/?$?1?/?task?;?then
?????# Newer kernel; has a task/ directory.
?????if?test?`?/?bin?/?ls?/?proc?/?$?1?/?task?|?/?usr?/?bin?/?wc?-?l?`?-?gt?1?2?>?/?dev?/?null?;?then
?????????backtrace?=?"thread apply all bt"
?????fi
elif?test?-?f?/?proc?/?$?1?/?maps?;?then
?????# Older kernel; go by it loading libpthread.
?????if?/?bin?/?grep?-?e?libpthread?/?proc?/?$?1?/?maps?>?/?dev?/?null?2?>?&?1?;?then
?????????backtrace?=?"thread apply all bt"
?????断续器
断续器
GDB?=?$?{?GDB?:?-?/?usr?/?bin?/?gdb?}
#运行GDB,去除不需要的噪音。
# --readnever 不再使用,因为.gdb_index现在正在使用中。
$?GDB?--安静?-?nx?$?GDBARGS?/?proc?/?$?1?/?exe?$?1?<<?EOF?2?>?&?1?|
设置宽度?0
设定高度?0
设置分页否
$?回溯
EOF
/?箱?/?sed?-?n?\
?????-?e?'s/^?\?(?(?gdb?)?\)*//'?\
?????-?e?'/^#/p'?\
?????-?e?'/^Thread/p'
观看?.?/?pstack?.?sh?26254
跟踪结果如下,gevent长时间再做epoll_wait,epoll_poll操作?
跟踪函数的调用
<span style="color:#333333"><span style="background-color:#f6f6f6"><span style="color:#444444"><span style="background-color:#f6f6f6"><span style="color:#888888"># apt install strace</span>
<span style="color:#333333"><strong>strace</strong></span> -f -p <span style="color:#880000">26254</span></span></span></span></span>
# apt install strace
strace?-?f?-?p?26254
执行结果
<span style="color:#333333"><span style="background-color:#f6f6f6"><span style="color:#444444"><span style="background-color:#f6f6f6"># epoll_wait dead loop
getpid() = <span style="color:#880000">26254</span>
clock_gettime(<span style="color:#333333"><strong>CLOCK_MONOTONIC</strong></span>, {tv_sec=4174, tv_nsec=439918115}) = <span style="color:#880000">0</span>
clock_gettime(<span style="color:#333333"><strong>CLOCK_MONOTONIC</strong></span>, {tv_sec=4174, tv_nsec=439940304}) = <span style="color:#880000">0</span>
epoll_wait(<span style="color:#880000">5</span>, [{EPOLLOUT, {u32=7, u64=12884901895}}], <span style="color:#880000">64</span>, <span style="color:#880000">0</span>) = <span style="color:#880000">1</span>
clock_gettime(<span style="color:#333333"><strong>CLOCK_MONOTONIC</strong></span>, {tv_sec=4174, tv_nsec=439983956}) = <span style="color:#880000">0</span>
getpid() = <span style="color:#880000">26254</span>
clock_gettime(<span style="color:#333333"><strong>CLOCK_MONOTONIC</strong></span>, {tv_sec=4174, tv_nsec=440030506}) = <span style="color:#880000">0</span>
clock_gettime(<span style="color:#333333"><strong>CLOCK_MONOTONIC</strong></span>, {tv_sec=4174, tv_nsec=440052716}) = <span style="color:#880000">0</span>
epoll_wait(<span style="color:#880000">5</span>, [{EPOLLOUT, {u32=7, u64=12884901895}}], <span style="color:#880000">64</span>, <span style="color:#880000">0</span>) = <span style="color:#880000">1</span>
clock_gettime(<span style="color:#333333"><strong>CLOCK_MONOTONIC</strong></span>, {tv_sec=4174, tv_nsec=440096569}) = <span style="color:#880000">0</span></span></span></span></span>
# epoll_wait死环
getpid?(?)?=?26254
clock_gettime?(CLOCK_MONOTONIC?,{tv_sec?=?4174,tv_nsec?=?439918115?}?)?=?0
clock_gettime?(?CLOCK_MONOTONIC?,?{?tv_sec?=?4174?,?tv_nsec?=?439940304?}?)?=?0
epoll_wait?(?5?,?[?{?EPOLLOUT?,?{?u32?=?7?,?u64?=?12884901895?}?}?}?]?,?64,?0?)?=?1
clock_gettime?(?CLOCK_MONOTONIC?,?{?tv_sec?=?4174?,?tv_nsec?=?439983956?}?)?=?0
getpid?(?)?=?26254
clock_gettime?(?CLOCK_MONOTONIC?,?{?tv_sec?=?4174?,?tv_nsec?=?440030506?}?)?=?0
clock_gettime?(?CLOCK_MONOTONIC?,{?tv_sec?=?4174?,tv_nsec?=?440052716?}?)?=?0
epoll_wait?(?5?,?[?{?EPOLLOUT?,?{?u32?=?7?,?u64?=?12884901895?}?}?}?]?,?64,?0?)?=?1
clock_gettime?(?CLOCK_MONOTONIC?,?{?tv_sec?=?4174?,?tv_nsec?=?440096569?}?)?=?0
调试结果
<span style="color:#444444"><span style="background-color:#f6f6f6">I test my code again , high cpu, environment:gevent == <span style="color:#880000">20.9</span>.<span style="color:#880000">0</span> <span style="color:#397300">and</span> kafka-<span style="color:#333333"><strong>python</strong></span> == <span style="color:#880000">1.4</span>.<span style="color:#880000">7</span>
I test my code again , <span style="color:#333333"><strong>normal</strong></span> cpu, environment:gevent == <span style="color:#880000">1.5</span>.<span style="color:#880000">0</span> <span style="color:#397300">and</span> kafka-<span style="color:#333333"><strong>python</strong></span> == <span style="color:#880000">2.0</span>.<span style="color:#880000">2</span>
But in github issues another test gevent==<span style="color:#880000">20.9</span>.<span style="color:#880000">0</span> <span style="color:#397300">and</span> kafka-<span style="color:#333333"><strong>python</strong></span> == <span style="color:#880000">1.4</span>.<span style="color:#880000">7</span> <span style="color:#333333"><strong>is</strong></span> <span style="color:#333333"><strong>normal</strong></span>.
</span></span>
monkey后,gevent在做大量的电话等待操作。直接导致了死循环。处理方案:gevent降级为gevent==1.5.0 (greenlet==0.4.15)后,一切正常。